Kafka consumer are too fast for some partitions in "flatMap" like jobs

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka consumer are too fast for some partitions in "flatMap" like jobs

Oleksandr Baliev
Hello,

There is one Flink job which consumes from Kafka topic (TOPIC_IN), simply flatMap / map data and push to another Kafka topic (TOPIC_OUT). 
TOPIC_IN has around 30 partitions, data is more or less sequential per partition and the job has parallelism 30. So in theory there should be 1:1 mapping between consumer and partition. 

But it's often to see big lag in offsets for some partitions. So that should mean that some of consumers are slower than another (i.e. some network issues for particular broker host or anything else). So data in TOPIC_OUT partitions is distributed but not sequential at all.

So when some another flink job consumes from TOPIC_OUT and uses BoundedOutOfOrdernessTimestampExtractor to generate watermarks, due to difference in data timestamps, there can be a lot of late data. Maybe something is missing of course in this setup or there is more good approach for such flatMap / map jobs.

Setting big WindowedStream#allowedLateness or giving more time for BoundedOutOfOrdernessTimestampExtractor will increase memory consumption and probably will cause another issues and anyway there can be late data which is not good for later windows.

One of the solution is to have some shared place, to synchronize lower timestamp between consumers and somehow slow down consumption (Thread sleep, wait, while loop with condition...).

0. Is there any good approach to handle such "Kafka <-  flatMap / map -> Kafka" tasks? so data in TOPIC_OUT will be sequential as in TOPIC_IN.

1. As far as I see it should be common problem with some slow consumers for big Kafka topic with a lot of partitions, isn't it? How Flink/Kafka hadle it?

2. Does somebody know, is there any mechanism in Flink - Kafka, (backpreassure?), which can tell from child operator (some process function for example) to specific fast consumers to slow down a bit? Is something like callback possible in Flink, don't think so, but..?

3. Or is there in Flink already anything which can help to synchronize minimum timestamps between consumers and?

4. Is there any good approach to slow down consumption in Kafka consumer? There should be some problems between session timeout and poll I think or something related to that, but maybe there is already some good solution for that :)

Will be glad if somebody can give some hints for any of the questions,

Best,
Sasha
Reply | Threaded
Open this post in threaded view
|

Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

Elias Levy
How many partitions does the output topic have?  If it has the same number of partitions as the input topic (30), have you considered simply using a custom partitioner for the Kafka sink that uses the input partition number as the output partition number?  If the input messages are ordered per input partition, that would guarantee their order in the output partitions.

On Tue, Aug 29, 2017 at 1:54 AM, Oleksandr Baliev <[hidden email]> wrote:
Hello,

There is one Flink job which consumes from Kafka topic (TOPIC_IN), simply flatMap / map data and push to another Kafka topic (TOPIC_OUT). 
TOPIC_IN has around 30 partitions, data is more or less sequential per partition and the job has parallelism 30. So in theory there should be 1:1 mapping between consumer and partition. 

But it's often to see big lag in offsets for some partitions. So that should mean that some of consumers are slower than another (i.e. some network issues for particular broker host or anything else). So data in TOPIC_OUT partitions is distributed but not sequential at all.

So when some another flink job consumes from TOPIC_OUT and uses BoundedOutOfOrdernessTimestampExtractor to generate watermarks, due to difference in data timestamps, there can be a lot of late data. Maybe something is missing of course in this setup or there is more good approach for such flatMap / map jobs.

Setting big WindowedStream#allowedLateness or giving more time for BoundedOutOfOrdernessTimestampExtractor will increase memory consumption and probably will cause another issues and anyway there can be late data which is not good for later windows.

One of the solution is to have some shared place, to synchronize lower timestamp between consumers and somehow slow down consumption (Thread sleep, wait, while loop with condition...).

0. Is there any good approach to handle such "Kafka <-  flatMap / map -> Kafka" tasks? so data in TOPIC_OUT will be sequential as in TOPIC_IN.

1. As far as I see it should be common problem with some slow consumers for big Kafka topic with a lot of partitions, isn't it? How Flink/Kafka hadle it?

2. Does somebody know, is there any mechanism in Flink - Kafka, (backpreassure?), which can tell from child operator (some process function for example) to specific fast consumers to slow down a bit? Is something like callback possible in Flink, don't think so, but..?

3. Or is there in Flink already anything which can help to synchronize minimum timestamps between consumers and?

4. Is there any good approach to slow down consumption in Kafka consumer? There should be some problems between session timeout and poll I think or something related to that, but maybe there is already some good solution for that :)

Will be glad if somebody can give some hints for any of the questions,

Best,
Sasha

Reply | Threaded
Open this post in threaded view
|

Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

Oleksandr Baliev
Hi Elias, 

Thanks for reply, TOPIC_OUT has less partitions, ~20, but actually there are 4 output topics with different amount of partitions. So the Job is kind of router.

In general to have 1:1 partitions for IN and OUT topics is good, thanks for tip. But since the main goal is to have windows in next Job which consumes data from OUT_TOPIC, data in partitions will be sequential by partitions, but not well distributed (some partitions will have more late data then others). So when window will be applied and some data from one of 30th (if setup TOPIC_OUT with 30 partitions) partitions will come with timestamp (watermark) which is out if range of given time for watermarkExtractor, window will be closed, and data will be counted as late. 

If take next example, 2 partitions (not sure if indentation will be okay in your mail, so sorry in advance):
PARTITION - TOPIC_IN data (timestamps) - TOPIC_OUT after maping (timestamps)
1                  - t5, t4, t3, t2, t1                        - t5, t4, t3, t2, t1  
2                  - t5, t4, t3, t2, t1                        - t3, t2, t1
I.e. there were some lag for consumer/producer for partition 2 so in TOPIC_OUT data is coming later.
It means that when another job will read data from TOPIC_OUT, it will read p1t1 (entry with t1 timestamp and partition 1), appropriate window will be created i.e. w"t0-t1" (entries with timestamp from t0 till t1 timestamps will be taken into account). Then p1t2 entry come and window w"t0-t1" will be closed and w"t1-t2" window will be created and so on till object p2t1 will come and it will be counted as late data. I think that should be very common situation for such tasks. I don't take into account BoundedOutOfOrdernessTimestampExtractor or allowLateness parameters, because they can just tune this things, but late data anyway will come. 

Hm... probably I'm just thinking about the problem as not isolated, so as "TOPIC_IN -> map job -> TOPIC_OUT -> another job with window". But I think "TOPIC_OUT -> another job with window" should be separated from this pipeline. And reviewed like a job with late data which is common question for parallel data sources with just some slow parts.

So the main question is how to synchronize data reading between kafka partitions when data is sequential per partitions, but late for some of them and we care about that data is not thrown away and will be fully processed for some time range (window) later? :) It's more about manually handling consumption on Kafka Fetch level and FlinkKafka* is high level for that, isn't it? 

Also in Flink, watermark is only one thing which can be somehow applied as i understand to that task, as it's a global metric, but it's using only for window mechanism. Which is more about window assigner (how/when to create window) and triggers (how/when to close window) and it's cannot say to consumer, please wait a bit, because I know that some data is still there, so let's check all sources first and only then I'll be closed, or maybe not.. somehow in assigner somehow do not create window, so just wait till some condition, but again some problems with stopping consumer. Eh..

Best,
Sasha

2017-08-29 18:10 GMT+02:00 Elias Levy <[hidden email]>:
How many partitions does the output topic have?  If it has the same number of partitions as the input topic (30), have you considered simply using a custom partitioner for the Kafka sink that uses the input partition number as the output partition number?  If the input messages are ordered per input partition, that would guarantee their order in the output partitions.

On Tue, Aug 29, 2017 at 1:54 AM, Oleksandr Baliev <[hidden email]> wrote:
Hello,

There is one Flink job which consumes from Kafka topic (TOPIC_IN), simply flatMap / map data and push to another Kafka topic (TOPIC_OUT). 
TOPIC_IN has around 30 partitions, data is more or less sequential per partition and the job has parallelism 30. So in theory there should be 1:1 mapping between consumer and partition. 

But it's often to see big lag in offsets for some partitions. So that should mean that some of consumers are slower than another (i.e. some network issues for particular broker host or anything else). So data in TOPIC_OUT partitions is distributed but not sequential at all.

So when some another flink job consumes from TOPIC_OUT and uses BoundedOutOfOrdernessTimestampExtractor to generate watermarks, due to difference in data timestamps, there can be a lot of late data. Maybe something is missing of course in this setup or there is more good approach for such flatMap / map jobs.

Setting big WindowedStream#allowedLateness or giving more time for BoundedOutOfOrdernessTimestampExtractor will increase memory consumption and probably will cause another issues and anyway there can be late data which is not good for later windows.

One of the solution is to have some shared place, to synchronize lower timestamp between consumers and somehow slow down consumption (Thread sleep, wait, while loop with condition...).

0. Is there any good approach to handle such "Kafka <-  flatMap / map -> Kafka" tasks? so data in TOPIC_OUT will be sequential as in TOPIC_IN.

1. As far as I see it should be common problem with some slow consumers for big Kafka topic with a lot of partitions, isn't it? How Flink/Kafka hadle it?

2. Does somebody know, is there any mechanism in Flink - Kafka, (backpreassure?), which can tell from child operator (some process function for example) to specific fast consumers to slow down a bit? Is something like callback possible in Flink, don't think so, but..?

3. Or is there in Flink already anything which can help to synchronize minimum timestamps between consumers and?

4. Is there any good approach to slow down consumption in Kafka consumer? There should be some problems between session timeout and poll I think or something related to that, but maybe there is already some good solution for that :)

Will be glad if somebody can give some hints for any of the questions,

Best,
Sasha


Reply | Threaded
Open this post in threaded view
|

Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

Elias Levy
On Wed, Aug 30, 2017 at 11:50 AM, Oleksandr Baliev <[hidden email]> wrote:

So the main question is how to synchronize data reading between kafka partitions when data is sequential per partitions, but late for some of them and we care about that data is not thrown away and will be fully processed for some time range (window) later? :) It's more about manually handling consumption on Kafka Fetch level and FlinkKafka* is high level for that, isn't it? 


At some point you have to give up on late data and drop it if you are performing some window computation.  That said, that could be a long time, allowing for very out of order data. Presumably most data won't be late, and you want to output preliminary results to have timely data.  In that case you want to implement a window trigger that fires early at regular intervals without purging if it has received new events since the last time it fired and purges the data once the allowed lateness time passes.

For instance, see this EventTimeTriggerWithEarlyAndLateFiring in Java or this simplified EarlyFiringEventTimeTrigger in Scala.