How many task managers to launch for a job?

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

How many task managers to launch for a job?

black chase

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ?

--
Best Regards!
Pengcheng Duan
Reply | Threaded
Open this post in threaded view
|

Re: How many task managers to launch for a job?

Xintong Song
Hi black,

If you are running Flink on Yarn or Mesos, Flink will automatically allocate resource and launch new TaskManagers as needed.

If you are using Flink standalone mode, then the easiest way is to enable slot sharing and set all the vertices into the same group (which is by default). In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager) needed for running the job would be the maximum parallelism of the job graph vertices. Further information on slot sharing could be found here.

Thank you~

Xintong Song



On Thu, May 23, 2019 at 11:49 PM black chase <[hidden email]> wrote:

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ?

--
Best Regards!
Pengcheng Duan
Reply | Threaded
Open this post in threaded view
|

Re: How many task managers to launch for a job?

black chase
Hi Song,
Thank you for the clarification.
Now I know TaskManagers are automatically allocated. Yet, I am still not very clear how the TMs are allocated.
I'm guessing the allocation process would be:
On the job side, I have a job with each operator parallelism=5. Since one TaskManager has one slot. It means this job would need 5 TaskManagers becase one pipeline would need one slot according to the task scheduling policy.
On the Mesos side, let's say currently there are no available TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
If what I am guessing is right, then let's say we have 4 physical computing nodes for the Flink TaskManagers. How would Mesos place the 5 new TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin fashion?
Kind regards
Chase



On Fri, May 24, 2019 at 4:10 AM Xintong Song <[hidden email]> wrote:
Hi black,

If you are running Flink on Yarn or Mesos, Flink will automatically allocate resource and launch new TaskManagers as needed.

If you are using Flink standalone mode, then the easiest way is to enable slot sharing and set all the vertices into the same group (which is by default). In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager) needed for running the job would be the maximum parallelism of the job graph vertices. Further information on slot sharing could be found here.

Thank you~

Xintong Song



On Thu, May 23, 2019 at 11:49 PM black chase <[hidden email]> wrote:

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ?

--
Best Regards!
Pengcheng Duan


--
Best Regards!
Reply | Threaded
Open this post in threaded view
|

How can i just implement a crontab function using flink?

wanglei2@geekplus.com.cn
In reply to this post by black chase

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 


 
Reply | Threaded
Open this post in threaded view
|

Re: How can i just implement a crontab function using flink?

Jörn Franke
Just sent a dummy event from the source system every minute

Am 24.05.2019 um 10:20 schrieb "[hidden email]" <[hidden email]>:


I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 


 
Reply | Threaded
Open this post in threaded view
|

Re: How can i just implement a crontab function using flink?

Puneet Kinra-2
In reply to this post by wanglei2@geekplus.com.cn
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM [hidden email] <[hidden email]> wrote:

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 


 


--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Re: How can i just implement a crontab function using flink?

wanglei2@geekplus.com.cn
In reply to this post by Jörn Franke

Thanks, it's a alternative solution.


 
Date: 2019-05-24 16:31
Subject: Re: How can i just implement a crontab function using flink?
Just sent a dummy event from the source system every minute

Am 24.05.2019 um 10:20 schrieb "[hidden email]" <[hidden email]>:


I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 


 
Reply | Threaded
Open this post in threaded view
|

Re: How many task managers to launch for a job?

Xintong Song
In reply to this post by black chase
As far as I know, Flink does not have any requirements on how the TaskManagers are distributed across physical machines. So I think it really depends on the scheduling policy of the Mesos cluster. I'm not an expert on Mesos, so correct me if I was wrong.

Thank you~

Xintong Song



On Fri, May 24, 2019 at 4:18 PM black chase <[hidden email]> wrote:
Hi Song,
Thank you for the clarification.
Now I know TaskManagers are automatically allocated. Yet, I am still not very clear how the TMs are allocated.
I'm guessing the allocation process would be:
On the job side, I have a job with each operator parallelism=5. Since one TaskManager has one slot. It means this job would need 5 TaskManagers becase one pipeline would need one slot according to the task scheduling policy.
On the Mesos side, let's say currently there are no available TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
If what I am guessing is right, then let's say we have 4 physical computing nodes for the Flink TaskManagers. How would Mesos place the 5 new TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin fashion?
Kind regards
Chase



On Fri, May 24, 2019 at 4:10 AM Xintong Song <[hidden email]> wrote:
Hi black,

If you are running Flink on Yarn or Mesos, Flink will automatically allocate resource and launch new TaskManagers as needed.

If you are using Flink standalone mode, then the easiest way is to enable slot sharing and set all the vertices into the same group (which is by default). In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager) needed for running the job would be the maximum parallelism of the job graph vertices. Further information on slot sharing could be found here.

Thank you~

Xintong Song



On Thu, May 23, 2019 at 11:49 PM black chase <[hidden email]> wrote:

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ?

--
Best Regards!
Pengcheng Duan


--
Best Regards!
Reply | Threaded
Open this post in threaded view
|

Re: How many task managers to launch for a job?

black chase
Yes true. I am trying to figure out how the TaskManagers are distributed across physical machines by Mesos and YARN. Maybe I shoud start a new thread for help.
Thank you Song
Best,
Pengcheng

On Fri, May 24, 2019 at 10:52 AM Xintong Song <[hidden email]> wrote:
As far as I know, Flink does not have any requirements on how the TaskManagers are distributed across physical machines. So I think it really depends on the scheduling policy of the Mesos cluster. I'm not an expert on Mesos, so correct me if I was wrong.

Thank you~

Xintong Song



On Fri, May 24, 2019 at 4:18 PM black chase <[hidden email]> wrote:
Hi Song,
Thank you for the clarification.
Now I know TaskManagers are automatically allocated. Yet, I am still not very clear how the TMs are allocated.
I'm guessing the allocation process would be:
On the job side, I have a job with each operator parallelism=5. Since one TaskManager has one slot. It means this job would need 5 TaskManagers becase one pipeline would need one slot according to the task scheduling policy.
On the Mesos side, let's say currently there are no available TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
If what I am guessing is right, then let's say we have 4 physical computing nodes for the Flink TaskManagers. How would Mesos place the 5 new TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin fashion?
Kind regards
Chase



On Fri, May 24, 2019 at 4:10 AM Xintong Song <[hidden email]> wrote:
Hi black,

If you are running Flink on Yarn or Mesos, Flink will automatically allocate resource and launch new TaskManagers as needed.

If you are using Flink standalone mode, then the easiest way is to enable slot sharing and set all the vertices into the same group (which is by default). In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager) needed for running the job would be the maximum parallelism of the job graph vertices. Further information on slot sharing could be found here.

Thank you~

Xintong Song



On Thu, May 23, 2019 at 11:49 PM black chase <[hidden email]> wrote:

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ?

--
Best Regards!
Pengcheng Duan


--
Best Regards!


--
Best Regards!
Reply | Threaded
Open this post in threaded view
|

Re: Re: How can i just implement a crontab function using flink?

wanglei2@geekplus.com.cn
In reply to this post by Puneet Kinra-2

Thanks. got it 


 
Date: 2019-05-24 17:02
Subject: Re: How can i just implement a crontab function using flink?
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM [hidden email] <[hidden email]> wrote:

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 


 


--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: How many task managers to launch for a job?

black chase
In reply to this post by black chase
Hi Song,
You said "In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager)", do you imply that one taskmanager contains one slot?
Do you have some experience on how many slots to spawn for one TaskManager?
I read the Flip-6, it says "For the sake of simplicity, the following talks about “slots”, but one can think simply of “TaskManager” instead, for the common case of a one-slot TaskManager.". 
It seems the common practice is to have one slot for one taskmanager.

Best,
Chase


On Fri, May 24, 2019 at 11:28 AM black chase <[hidden email]> wrote:
Yes true. I am trying to figure out how the TaskManagers are distributed across physical machines by Mesos and YARN. Maybe I shoud start a new thread for help.
Thank you Song
Best,
Pengcheng

On Fri, May 24, 2019 at 10:52 AM Xintong Song <[hidden email]> wrote:
As far as I know, Flink does not have any requirements on how the TaskManagers are distributed across physical machines. So I think it really depends on the scheduling policy of the Mesos cluster. I'm not an expert on Mesos, so correct me if I was wrong.

Thank you~

Xintong Song



On Fri, May 24, 2019 at 4:18 PM black chase <[hidden email]> wrote:
Hi Song,
Thank you for the clarification.
Now I know TaskManagers are automatically allocated. Yet, I am still not very clear how the TMs are allocated.
I'm guessing the allocation process would be:
On the job side, I have a job with each operator parallelism=5. Since one TaskManager has one slot. It means this job would need 5 TaskManagers becase one pipeline would need one slot according to the task scheduling policy.
On the Mesos side, let's say currently there are no available TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
If what I am guessing is right, then let's say we have 4 physical computing nodes for the Flink TaskManagers. How would Mesos place the 5 new TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin fashion?
Kind regards
Chase



On Fri, May 24, 2019 at 4:10 AM Xintong Song <[hidden email]> wrote:
Hi black,

If you are running Flink on Yarn or Mesos, Flink will automatically allocate resource and launch new TaskManagers as needed.

If you are using Flink standalone mode, then the easiest way is to enable slot sharing and set all the vertices into the same group (which is by default). In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager) needed for running the job would be the maximum parallelism of the job graph vertices. Further information on slot sharing could be found here.

Thank you~

Xintong Song



On Thu, May 23, 2019 at 11:49 PM black chase <[hidden email]> wrote:

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ?

--
Best Regards!
Pengcheng Duan


--
Best Regards!


--
Best Regards!


--
Best Regards!
Reply | Threaded
Open this post in threaded view
|

How can I add config file as classpath in taskmgr node when submitting a flink job?

wanglei2@geekplus.com.cn
In reply to this post by wanglei2@geekplus.com.cn

When starting  a single node java application, I can add some config file to it.

How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess.




Reply | Threaded
Open this post in threaded view
|

Re: How many task managers to launch for a job?

Xintong Song
In reply to this post by black chase
Well, it depends on how many resource are needed for one pipeline of you job and how many resource are are configured for each TaskExecutor. In addition, the resource of each TaskManager also depends on the job's resource needs and your environment. So having one slot for each TaskManager would be a simple choice because it avoids tuning these two relevant factors at the same time. 

Thank you~

Xintong Song



On Sat, May 25, 2019 at 4:54 AM black chase <[hidden email]> wrote:
Hi Song,
You said "In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager)", do you imply that one taskmanager contains one slot?
Do you have some experience on how many slots to spawn for one TaskManager?
I read the Flip-6, it says "For the sake of simplicity, the following talks about “slots”, but one can think simply of “TaskManager” instead, for the common case of a one-slot TaskManager.". 
It seems the common practice is to have one slot for one taskmanager.

Best,
Chase


On Fri, May 24, 2019 at 11:28 AM black chase <[hidden email]> wrote:
Yes true. I am trying to figure out how the TaskManagers are distributed across physical machines by Mesos and YARN. Maybe I shoud start a new thread for help.
Thank you Song
Best,
Pengcheng

On Fri, May 24, 2019 at 10:52 AM Xintong Song <[hidden email]> wrote:
As far as I know, Flink does not have any requirements on how the TaskManagers are distributed across physical machines. So I think it really depends on the scheduling policy of the Mesos cluster. I'm not an expert on Mesos, so correct me if I was wrong.

Thank you~

Xintong Song



On Fri, May 24, 2019 at 4:18 PM black chase <[hidden email]> wrote:
Hi Song,
Thank you for the clarification.
Now I know TaskManagers are automatically allocated. Yet, I am still not very clear how the TMs are allocated.
I'm guessing the allocation process would be:
On the job side, I have a job with each operator parallelism=5. Since one TaskManager has one slot. It means this job would need 5 TaskManagers becase one pipeline would need one slot according to the task scheduling policy.
On the Mesos side, let's say currently there are no available TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
If what I am guessing is right, then let's say we have 4 physical computing nodes for the Flink TaskManagers. How would Mesos place the 5 new TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin fashion?
Kind regards
Chase



On Fri, May 24, 2019 at 4:10 AM Xintong Song <[hidden email]> wrote:
Hi black,

If you are running Flink on Yarn or Mesos, Flink will automatically allocate resource and launch new TaskManagers as needed.

If you are using Flink standalone mode, then the easiest way is to enable slot sharing and set all the vertices into the same group (which is by default). In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager) needed for running the job would be the maximum parallelism of the job graph vertices. Further information on slot sharing could be found here.

Thank you~

Xintong Song



On Thu, May 23, 2019 at 11:49 PM black chase <[hidden email]> wrote:

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ?

--
Best Regards!
Pengcheng Duan


--
Best Regards!


--
Best Regards!


--
Best Regards!
Reply | Threaded
Open this post in threaded view
|

Re: How many task managers to launch for a job?

black chase
OK, I see. Thank you very much Song
Best Regards
Chase

On Mon, May 27, 2019 at 4:01 AM Xintong Song <[hidden email]> wrote:
Well, it depends on how many resource are needed for one pipeline of you job and how many resource are are configured for each TaskExecutor. In addition, the resource of each TaskManager also depends on the job's resource needs and your environment. So having one slot for each TaskManager would be a simple choice because it avoids tuning these two relevant factors at the same time. 

Thank you~

Xintong Song



On Sat, May 25, 2019 at 4:54 AM black chase <[hidden email]> wrote:
Hi Song,
You said "In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager)", do you imply that one taskmanager contains one slot?
Do you have some experience on how many slots to spawn for one TaskManager?
I read the Flip-6, it says "For the sake of simplicity, the following talks about “slots”, but one can think simply of “TaskManager” instead, for the common case of a one-slot TaskManager.". 
It seems the common practice is to have one slot for one taskmanager.

Best,
Chase


On Fri, May 24, 2019 at 11:28 AM black chase <[hidden email]> wrote:
Yes true. I am trying to figure out how the TaskManagers are distributed across physical machines by Mesos and YARN. Maybe I shoud start a new thread for help.
Thank you Song
Best,
Pengcheng

On Fri, May 24, 2019 at 10:52 AM Xintong Song <[hidden email]> wrote:
As far as I know, Flink does not have any requirements on how the TaskManagers are distributed across physical machines. So I think it really depends on the scheduling policy of the Mesos cluster. I'm not an expert on Mesos, so correct me if I was wrong.

Thank you~

Xintong Song



On Fri, May 24, 2019 at 4:18 PM black chase <[hidden email]> wrote:
Hi Song,
Thank you for the clarification.
Now I know TaskManagers are automatically allocated. Yet, I am still not very clear how the TMs are allocated.
I'm guessing the allocation process would be:
On the job side, I have a job with each operator parallelism=5. Since one TaskManager has one slot. It means this job would need 5 TaskManagers becase one pipeline would need one slot according to the task scheduling policy.
On the Mesos side, let's say currently there are no available TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
If what I am guessing is right, then let's say we have 4 physical computing nodes for the Flink TaskManagers. How would Mesos place the 5 new TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin fashion?
Kind regards
Chase



On Fri, May 24, 2019 at 4:10 AM Xintong Song <[hidden email]> wrote:
Hi black,

If you are running Flink on Yarn or Mesos, Flink will automatically allocate resource and launch new TaskManagers as needed.

If you are using Flink standalone mode, then the easiest way is to enable slot sharing and set all the vertices into the same group (which is by default). In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager) needed for running the job would be the maximum parallelism of the job graph vertices. Further information on slot sharing could be found here.

Thank you~

Xintong Song



On Thu, May 23, 2019 at 11:49 PM black chase <[hidden email]> wrote:

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ?

--
Best Regards!
Pengcheng Duan


--
Best Regards!


--
Best Regards!


--
Best Regards!


--
Best Regards!
Reply | Threaded
Open this post in threaded view
|

Re: How can I add config file as classpath in taskmgr node when submitting a flink job?

Yang Wang
In reply to this post by wanglei2@geekplus.com.cn
Hi, wanglei
You could use the flink distributed cache to register some config files and then access them in your task.
1. Register a cached file
StreamExecutionEnvironment.registerCachedFile(inputFile.toString(), "test_data", false);
2. Access the file in your task
final Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath();

[hidden email] <[hidden email]> 于2019年5月26日周日 上午12:06写道:

When starting  a single node java application, I can add some config file to it.

How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess.




Reply | Threaded
Open this post in threaded view
|

Re: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?

wanglei2@geekplus.com.cn

Thanks. Let me have a try

 
Date: 2019-05-28 09:47
Subject: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?
Hi, wanglei
You could use the flink distributed cache to register some config files and then access them in your task.
1. Register a cached file
StreamExecutionEnvironment.registerCachedFile(inputFile.toString(), "test_data", false);
2. Access the file in your task
final Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath();

[hidden email] <[hidden email]> 于2019年5月26日周日 上午12:06写道:

When starting  a single node java application, I can add some config file to it.

How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess.




Reply | Threaded
Open this post in threaded view
|

Re: Re: How can i just implement a crontab function using flink?

wanglei2@geekplus.com.cn
In reply to this post by Puneet Kinra-2

I  tried。 But the  MyProcessWindowFunction still not tigged when there's no event in the window

Any insight on this?


source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Map>() {
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - 10000);
}

@Override
public long extractTimestamp(Map map, long l) {
return System.currentTimeMillis();
}
}).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());


 
Date: 2019-05-24 17:02
Subject: Re: How can i just implement a crontab function using flink?
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM [hidden email] <[hidden email]> wrote:

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 


 


--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

How to trigger the window function even there's no message input in this window?

wanglei2@geekplus.com.cn
In reply to this post by wanglei2@geekplus.com.cn

windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());
How can i trigger the MyProcessWindowFunction even there's no input during this window time? 



Reply | Threaded
Open this post in threaded view
|

Re: How to trigger the window function even there's no message input in this window?

Piotr Nowojski-3
Hi,

As far as I know, this is currently impossible.

You can workaround this issue by maybe implementing your own custom post processing operator/flatMap function, that would:
- track the output of window operator
- register processing time timer with some desired timeout
- every time the processing time timer fires, your code would check if window operator has emitted something in the last X seconds period. If not, it could emit some default element

Piotrek

On 14 Jun 2019, at 12:08, [hidden email] wrote:


windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());
How can i trigger the MyProcessWindowFunction even there's no input during this window time? 



Reply | Threaded
Open this post in threaded view
|

Unable to restore state value after job failed using RocksDBStateBackend

wanglei2@geekplus.com.cn
In reply to this post by wanglei2@geekplus.com.cn
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {

private transient ValueState<Tuple2<Long,Long>> state;
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


Tuple2<Long, Long> stateValue = state.value();

if(stateValue == null){
log.info("########## initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);

}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
new TypeHint<Tuple2<Long, Long>>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.



 
12