Hi everybody,
I have the following flink/kafka setup:
I have 3 kafka “input” topics and 3 “output” topics with each 1 partition (only 1 partition because the order of the messages is important). I also have 1 master and 2 flink slave nodes with a total of 16 task slots. In my flink program I have created 3 consumers - each for one of the input topics. On each of the datastreams I run a query that generates statistics over a window of 1 second and I write the result to the corresponding output topic. You can find the execution plan with parallelism set to 2 attached.
This setup with parallelism=2 sometimes seems to give me the wrong order of the statistics results. I assume it is because of the rebalancing before the last map which leads to a race condition when writing to kafka.
If I set parallelism to 1 no rebalancing will be done but only one task slot is used.
This has led me to the following questions:
Why is only 1 task slot used with my 3 pipelines when parallelism is set to 1? As far as I understand, the parallelism refers to the number of parallel instances a task can be split into. Therefore, I would assume that I could still run multiple different tasks (e.g. different maps or window functions on different streams) in different task slots, right?
And to come back to my requirement: Is it not possible to run all 3 pipelines in parallel and still keep the order of the messages and results?
I also asked these questions on stackoverflow. And it seems that I have similar trouble understanding the terms “task slot”, “subtasks” etc. like Flavio mentioning in this flink mail thread.
Thank you and I would appreciate any input!
Best regards, Ben Flink_Execution_Plan.png (453K) Download Attachment |
Hi, So I have been rearranging my architecture to where I only have one input and one output topic, each with 3 partitions and in my flink job I have one consumer and one producer running with parallelism of 3. To run in parallel, I extract the partition from the metadata information per kafka message and keyBy that very partition. The code sample is at the bottom. Now it seems though, that my tumbling window of 1 second that I run on all partitions and that I use to calculate statistics only gives output on one partition. The reason seems to be that the timestamps of partition A and B are 2 hours ahead of partition C. In the documentation I read that the event time of an operator following a keyBy (my tumbling window) is the minimum of its input streams’ event times. But is that even the case for me? Does my tumbling window have multiple input streams? I thought that each subtask of the window would get only elements from one partition and therefore the watermarks would be calculated independently per stream. I would appreciate any input! Again, my goal is to run the same queries on independent kafka streams. Best regards, Ben import org.apache.flink.api.scala._ 2017-04-14 19:22 GMT+02:00 Benjamin Reißaus <[hidden email]>:
|
Hi Benjamin, In your case, the tumbling window subtasks would each have 3 input streams, 1 for each of the 3 FlinkKafkaConsumer operator subtasks.
This is a misunderstanding. After the keyBy, the window subtasks could get input from any of the consumer subtasks, and would therefore need to wait for broadcasted watermarks from all of them. It just happens to be that in your case, each consumer subtasks will only produce records with exactly one key. Moving back a bit to your original setup: it seems like what you want to achieve is a simple window on each partition independently, and then produce the window output to a new partition. In your original setup where each topic and its corresponding output topic each has 1 partition, I’d actually just have separate jobs for each topic-to-topic pipeline, instead of bundling them into one job. Was there any specific reason for bundling them together? Cheers, Gordon On 17 April 2017 at 5:04:26 PM, Benjamin Reißaus ([hidden email]) wrote:
|
Hi Gordon, Thank you for your explanation! It has helped a lot. Just for learning completeness: Here in the docs it says “the parallelism of a stream is always that of its producing operator”. Is “producing operator” referring to the very first operator, so in my case my Kafka consumer source? Also, is there a form of visualization for my job in which I can see which subtask is mapped to which task slot and which subtask has incoming streams from which other subtask? As for my setup: My goal was to compare the performance of the same flink job under different loads. I would like to increase the load by a) increasing the number of incoming kafka messages per stream and by b) increasing the number of kafka streams. That’s why I would prefer to have 1 job only. But I suppose that at least when working with event time and watermarks this is not possible. But of course your suggestion of having multiple jobs would solve the problem of running in parallel and keeping the order. Best regards, Ben 2017-04-17 13:42 GMT+02:00 Tzu-Li (Gordon) Tai <[hidden email]>:
|
Free forum by Nabble | Edit this page |