Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

Yassin Marzouki
Hi all,

I have a Kafka topic with two partitions, messages within each partition are ordered in ascending timestamps.

The following code works correctly (I'm running this on my local machine, the default parallelism is the number of cores=8):

stream = env.addSource(myFlinkKafkaConsumer09)
stream.map(mapper)
  .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
  .keyby(0)
  .timeWindow(Time.minutes(10))
  .reduce(reducer)
  .print()

But if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp monotony violated" warnings. My understanding is that only 2 sources will be mapped to the topic partitions and since messages are ordered within each partition, timestamps assignment should happen correctly regardless of the parallelsim as long as it is >= 2.
Question 1 : What is the explanation of this?


Now I add an other empty partition to the topic. The job doesn't give any output anymore and that's expected since it keeps waiting forever for the empty partition's watermark. What I don't understand though, is a strange behavior when set the parallelism explicitly at the source :
Question 2 Why am I able to get an output if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the empty partition argument apply here too? And why is that output seen only when n != 8 ?

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

Yassin Marzouki
I think I figured out the explanation of the first part. Looks like the stream gets distributed and merged between the source and the map operator because their parallelisms are different, and therefore the messages resulting from the map operator become out of order. The "Timestamp monotony violated" warnings disappeared when I set the source and the map operator to the same parallelism.
I found about operator chaining and I tried to chain the source and map operators (as in here : https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#tasks--operator-chains) in order to have the same parallelism, but I didn't succeed. Isn't doing env.addSource().setparallelism(n).startNewChain().map(...)disableChaining() equivalent to setting source and map parallelism to the same value?



On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <[hidden email]> wrote:
Hi all,

I have a Kafka topic with two partitions, messages within each partition are ordered in ascending timestamps.

The following code works correctly (I'm running this on my local machine, the default parallelism is the number of cores=8):

stream = env.addSource(myFlinkKafkaConsumer09)
stream.map(mapper)
  .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
  .keyby(0)
  .timeWindow(Time.minutes(10))
  .reduce(reducer)
  .print()

But if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp monotony violated" warnings. My understanding is that only 2 sources will be mapped to the topic partitions and since messages are ordered within each partition, timestamps assignment should happen correctly regardless of the parallelsim as long as it is >= 2.
Question 1 : What is the explanation of this?


Now I add an other empty partition to the topic. The job doesn't give any output anymore and that's expected since it keeps waiting forever for the empty partition's watermark. What I don't understand though, is a strange behavior when set the parallelism explicitly at the source :
Question 2 Why am I able to get an output if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the empty partition argument apply here too? And why is that output seen only when n != 8 ?

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

Yassin Marzouki
In reply to this post by Yassin Marzouki
I think I also figured out the reason of the behavior I described when one Kafka partition is empty.
According to the JavaDocs, the datastream partitioning is set to forward by default, i.e. each map sub-task will receive data from exactly one source sub-task. For one of the stream partitions (corresponding to the empty Kafka partition) resulting from the map operator, the watermark does not advance, which makes the window operator wait forever.
Now if the map and source operators have a different parallelism, Flink uses rebalance partitioning to redistribute the stream as pointed out in this mailing list thread, therefore the watermark advances for all the stream partitions output from the map operator.
Some of the details regarding the partitioning were mentioned in the 0.9 docs, but unfortunately they aren't in the  1.x docs.

On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <[hidden email]> wrote:
Hi all,

I have a Kafka topic with two partitions, messages within each partition are ordered in ascending timestamps.

The following code works correctly (I'm running this on my local machine, the default parallelism is the number of cores=8):

stream = env.addSource(myFlinkKafkaConsumer09)
stream.map(mapper)
  .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
  .keyby(0)
  .timeWindow(Time.minutes(10))
  .reduce(reducer)
  .print()

But if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp monotony violated" warnings. My understanding is that only 2 sources will be mapped to the topic partitions and since messages are ordered within each partition, timestamps assignment should happen correctly regardless of the parallelsim as long as it is >= 2.
Question 1 : What is the explanation of this?


Now I add an other empty partition to the topic. The job doesn't give any output anymore and that's expected since it keeps waiting forever for the empty partition's watermark. What I don't understand though, is a strange behavior when set the parallelism explicitly at the source :
Question 2 Why am I able to get an output if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the empty partition argument apply here too? And why is that output seen only when n != 8 ?

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

rmetzger0
Hi Yassine,

In Flink 1.2 we've added a new feature to the Kafka consumer, allowing you to extract timestamps and emitting watermarks per partition.
The consumers now have the following method:
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner)
Using a timestamp extractor directly attached to the consumer, you don't need to worry about the parallelism of subsequent operators.


On Mon, Aug 15, 2016 at 4:56 PM, Yassine Marzougui <[hidden email]> wrote:
I think I also figured out the reason of the behavior I described when one Kafka partition is empty.
According to the JavaDocs, the datastream partitioning is set to forward by default, i.e. each map sub-task will receive data from exactly one source sub-task. For one of the stream partitions (corresponding to the empty Kafka partition) resulting from the map operator, the watermark does not advance, which makes the window operator wait forever.
Now if the map and source operators have a different parallelism, Flink uses rebalance partitioning to redistribute the stream as pointed out in this mailing list thread, therefore the watermark advances for all the stream partitions output from the map operator.
Some of the details regarding the partitioning were mentioned in the 0.9 docs, but unfortunately they aren't in the  1.x docs.

On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <[hidden email]> wrote:
Hi all,

I have a Kafka topic with two partitions, messages within each partition are ordered in ascending timestamps.

The following code works correctly (I'm running this on my local machine, the default parallelism is the number of cores=8):

stream = env.addSource(myFlinkKafkaConsumer09)
stream.map(mapper)
  .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
  .keyby(0)
  .timeWindow(Time.minutes(10))
  .reduce(reducer)
  .print()

But if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp monotony violated" warnings. My understanding is that only 2 sources will be mapped to the topic partitions and since messages are ordered within each partition, timestamps assignment should happen correctly regardless of the parallelsim as long as it is >= 2.
Question 1 : What is the explanation of this?


Now I add an other empty partition to the topic. The job doesn't give any output anymore and that's expected since it keeps waiting forever for the empty partition's watermark. What I don't understand though, is a strange behavior when set the parallelism explicitly at the source :
Question 2 Why am I able to get an output if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the empty partition argument apply here too? And why is that output seen only when n != 8 ?

Best,
Yassine