Hi, I am redesigning the scheduler of the JobManager to place tasks of a job across TaskManagers accroding to a scheduling policy. I am reading the Flip-6 proposal and found that the common case is "one TaskManager launchs one slot", and "one Flink cluster serves one job". But I did not find how many TaskManagers to launch in a computing node. Is there any common practice for this ? -- Best Regards! Pengcheng Duan |
Hi black, If you are running Flink on Yarn or Mesos, Flink will automatically allocate resource and launch new TaskManagers as needed. If you are using Flink standalone mode, then the easiest way is to enable slot sharing and set all the vertices into the same group (which is by default). In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager) needed for running the job would be the maximum parallelism of the job graph vertices. Further information on slot sharing could be found here. Thank you~ Xintong Song On Thu, May 23, 2019 at 11:49 PM black chase <[hidden email]> wrote:
|
Hi Song, Thank you for the clarification. Now I know TaskManagers are automatically allocated. Yet, I am still not very clear how the TMs are allocated. I'm guessing the allocation process would be: On the job side, I have a job with each operator parallelism=5. Since one TaskManager has one slot. It means this job would need 5 TaskManagers becase one pipeline would need one slot according to the task scheduling policy. On the Mesos side, let's say currently there are no available TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it? If what I am guessing is right, then let's say we have 4 physical computing nodes for the Flink TaskManagers. How would Mesos place the 5 new TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin fashion? Kind regards Chase On Fri, May 24, 2019 at 4:10 AM Xintong Song <[hidden email]> wrote:
-- Best Regards! |
In reply to this post by black chase
I want to do something every one minute. Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But i still need to execute the funtion. How can i implement it ? |
Just sent a dummy event from the source system every minute
|
In reply to this post by wanglei2@geekplus.com.cn
There is concept of periodic watermarker , you can use that if you are working on eventtime.
|
In reply to this post by Jörn Franke
Thanks, it's a alternative solution.
|
In reply to this post by black chase
As far as I know, Flink does not have any requirements on how the TaskManagers are distributed across physical machines. So I think it really depends on the scheduling policy of the Mesos cluster. I'm not an expert on Mesos, so correct me if I was wrong. Thank you~ Xintong Song On Fri, May 24, 2019 at 4:18 PM black chase <[hidden email]> wrote:
|
Yes true. I am trying to figure out how the TaskManagers are distributed across physical machines by Mesos and YARN. Maybe I shoud start a new thread for help. Thank you Song Best, Pengcheng On Fri, May 24, 2019 at 10:52 AM Xintong Song <[hidden email]> wrote:
-- Best Regards! |
In reply to this post by Puneet Kinra-2
Thanks. got it
|
In reply to this post by black chase
Hi Song, You said "In that way, the total slots (or number of TaskManagers if you config on slot for each TaskManager)", do you imply that one taskmanager contains one slot? Do you have some experience on how many slots to spawn for one TaskManager? I read the Flip-6, it says "For the sake of simplicity, the following talks about “slots”, but one can think simply of “TaskManager” instead, for the common case of a one-slot TaskManager.". It seems the common practice is to have one slot for one taskmanager. Best, Chase On Fri, May 24, 2019 at 11:28 AM black chase <[hidden email]> wrote:
-- Best Regards! |
In reply to this post by wanglei2@geekplus.com.cn
When starting a single node java application, I can add some config file to it. How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess. |
In reply to this post by black chase
Well, it depends on how many resource are needed for one pipeline of you job and how many resource are are configured for each TaskExecutor. In addition, the resource of each TaskManager also depends on the job's resource needs and your environment. So having one slot for each TaskManager would be a simple choice because it avoids tuning these two relevant factors at the same time. Thank you~ Xintong Song On Sat, May 25, 2019 at 4:54 AM black chase <[hidden email]> wrote:
|
OK, I see. Thank you very much Song Best Regards Chase On Mon, May 27, 2019 at 4:01 AM Xintong Song <[hidden email]> wrote:
-- Best Regards! |
In reply to this post by wanglei2@geekplus.com.cn
Hi, wanglei You could use the flink distributed cache to register some config files and then access them in your task. 1. Register a cached file StreamExecutionEnvironment.registerCachedFile(inputFile.toString(), "test_data", false); 2. Access the file in your task final Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath();
|
Thanks. Let me have a try
|
In reply to this post by Puneet Kinra-2
I tried。 But the MyProcessWindowFunction still not tigged when there's no event in the window Any insight on this? source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Map>() {
|
In reply to this post by wanglei2@geekplus.com.cn
windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction()); How can i trigger the MyProcessWindowFunction even there's no input during this window time? |
Hi,
As far as I know, this is currently impossible. You can workaround this issue by maybe implementing your own custom post processing operator/flatMap function, that would: - track the output of window operator - register processing time timer with some desired timeout - every time the processing time timer fires, your code would check if window operator has emitted something in the last X seconds period. If not, it could emit some default element Piotrek
|
In reply to this post by wanglei2@geekplus.com.cn
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> { Every time I restarted the job, The stateValue is still null. |
Free forum by Nabble | Edit this page |