Hello Users,
I have a question that is perhaps not best solved within Flink: It has to do with notifying a downstream application that a Flink window has completed. The (simplified) scenario is this: - We have a Flink job that consumes from Kafka, does some preprocessing, and then has a sliding window of 10 minutes and slide time of 1 minute. - The number of keys in each slide is not fixed - The output of the window is then output to Kafka, which is read by a downstream application. What I want to achieve is that the downstream application can someone know when it has read all of the data for a single window, without waiting for the next window to arrive. Some options I've considered: - Producing a second window over the window results that counts the output size, which can then be used by the downstream application to see when it has received the same number: This seems fragile, as there it relies on there being no loss or duplication of data. Its also an extra window and Kafka stream which is a tad messy. - Somehow adding an 'end of window' element to each partitions of the Kafka topic which can be read by the consumer: This seems a bit messy because it mixes different types of events into the same Kafka stream, and there is no really simple way to do this in Flink - Package the whole window output into a single message and make this the unit of transaction: This is possible, but the message would be quite large then (at least 10s of mb), as the volume of this stream is quite large. - Assume that if the consumer has no elements to read, or if the next window has started to be read, then it has read the whole window: This seems reasonable, and if it wasn't for the fact that my consumer on the application end was a bit inflexible right now, it is probably the solution I would use. Any further/better ideas? Thanks Padarn |
Hi Padarn, What you describe is essentially publishing Flink's watermarks to an outside system. Flink processes time windows, by waiting for a watermark that's past the window end time. When it receives such a WM it processes and emits all ended windows and forwards the watermark. When a sink received a WM for say 12:45:15, you know that all window results with until 12:45:00 have been emitted. Hence, the watermark tells you about the completeness of data. However, using this information is not so easy, mostly because of the failure semantics. Things become much easier if you produce to Kafka with exactly-once semantics. In that case, you could have a ProcessFunction that is chained before the sink and which counts the window results per time slice and emits the result when the watermark passes to a side output. All side output messages are collected by a single task and can be published to a Kafka topic or even be made available via Queryable State. For at-least once output, it's much harder because you'll have duplicates in the output after a job recovered. Best, Fabian I think you have two options to let the consuming app know about the progress. You can either The ProcessFunction could count per window end timestamp how many records passed and forward that information via a side output. You could then Essentially, you'd like to publish Flink's watermarks to an outside system (possibly via Kafka). Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson <[hidden email]>:
|
Hi Fabian, thanks for your input Exactly. Actually my first instinct was to see if it was possible to publish the watermarks somehow - my initial idea was to insert regular watermark messages into each partition of the stream, but exposing this seemed quite troublesome. > In that case, you could have a ProcessFunction that is chained before the sink and which counts the window results per time slice and emits the result when the watermark passes to a side output. All side output messages are collected by a single task and can be published to a Kafka topic or even be made available via Queryable State. I understand the idea here (and exactly once semantics are probably fine for my use case), but counting events seems a bit fragile. I'm not totally confident the consumer can guarantee it won't read duplicates (its a golang kafka library that seems to have some quirks). I think ideally each partition of the kafka topic would have some regular information about watermarks. Perhaps the kafka producer can be modified to support this. Padarn On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <[hidden email]> wrote:
|
Hi Padarn, Yes, this is quite tricky. The "problem" with watermarks is that you need to consider how you write to Kafka. If your Kafka sink writes to keyed Kafka stream (each Kafka partition is written by multiple producers), you need to broadcast the watermarks to each partition, i.e., each partition would receive watermarks from each parallel sink task. So in order to reason about the current watermark of a partition, you need to observe them and take the minimum WM across all current sink task WMs. Things become much easier, if each partition is only written by a single task but this also means that data is not key-partitioned in Kafka. In that case, the sink task only needs to write a WM message to each of its assigned partitions. Hope this helps, Fabian Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <[hidden email]>:
|
Hi again Fabian, Thanks for pointing this out to me. In my case there is no need for keyed writing - but I do wonder if having each kafka task write only to a single partition would significantly affect performance. Actually now that I think about it, the approach to just wait for the first records of the next window is also subject to the problem you mention above: a producer lagging behind the rest could end up with a partition containing element out of ‘window order’. I was also thinking this problem is very similar to that of checkpoint barriers. I intended to dig into the details of the exactly once Kafka sink for some inspiration. Padarn On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi,
I'll chip in with an approach I'm trying at the moment that seems to work, and I say seems because I'm only running this on a personal project. Personally, I don't have anything against end-of-message markers per partition, Padarn you seem to not prefer this option as it overloads the meaning of the output payload. My approach is equally valid when producing watermarks/end-of-message markers on a side output though. The main problem of both approaches is knowing when the window has finished across all partitions without having to wait for the start of the next window. I've taken the approach of sending all output messages of the window to 1. the sink but also 2. a single task operator. The single task operator registers an event time based timer at the time of the end of the window. You have the confidence of the task's timer triggering only once at the right time because all the post-window watermarks go through to the same task. At that point I make the task send an end-of-message marker to every partition. I don't need to send the count because Kafka messages are ordered. AND IF you prefer to not overload the semantic of your original Kafka topic you can post the message to a separate location of your choice. While this does mean that the end of marker message only gets sent through once the window has finished across all substreams (as opposed to per stream), it does mean you don't need to wait for the next window to start AND the watermark gap between substreams should never grow that much anyway. This approach should be particularly useful when the number of partitions or keying mechanism is different between the input and output topics. Hopefully that doesn't sound like a terrible idea. eduardo On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <[hidden email]> wrote:
|
Hi Padarn, Regarding your throughput concerns: A sink task may write to multiple partitions, but each partition may only be written by a single task. @Eduardo: Thanks for sharing your approach! Not sure if I understood it correctly, but I think that the approach does not guarantee that all results of a window are emitted before the end-of-window marker is written. Since the sink operator and the single-task-operator are separate operators, the output records might get stuck (or be bufffered) in one of the sink tasks and the single-task would still emit an end-of-window marker record because it doesn't know about the sink task. Best, Fabian Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor <[hidden email]>:
|
Hi Fabian, > but each partition may only be written by a single task Sorry I think I misunderstand something here then: If I have a topic with one partition, but multiple sink tasks (or parallelism > 1).. this means the data must all be shuffled to the single task writing that partition? Padarn On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske <[hidden email]> wrote:
|
Hi all,
I'll illustrate my approach with an example as it is definitely unorthodox. Here's some sample code. It works for me...I hope there are no (obvious) flaws! //myStream should be a stream of objects associated to a timestamp. the idea is to create a Flink app that //sends each object to kafka with the ability of also sending an extra end-of-stream message after all events //for the same associated timestamp have been sent final SingleOutputStreamOperator<Tuple2<Long, Object>> myStream = null; //replace with actual stream final String KAFKA_SINK_TOPIC = "OUTPUT_TOPIC"; final SinkFunction<Tuple2<Long, Object>> singleObjectKafkaSink = null; //replace with... //needs to take care of mapping a timestamp into an end-of-stream marker record final KeyedSerializationSchema serializationSchema = null; //replace with... final Properties producerProperties = null; //replace with... final int kafkaProducerPoolSize = 6; //for example... //sinks every event of a window myStream.addSink(singleObjectKafkaSink); //sends one end-of-stream message per kafka partition myStream .map(item -> item._1()) //keep only the time .keyBy(item -> 1) //forces every event to go to the same task .process(new KeyedProcessFunction<Integer, Long, Tuple2<Long, Integer>>() { private KafkaProducer<String, Object> kafkaProducer; @Override public void open(Configuration parameters) throws Exception { kafkaProducer = new KafkaProducer<>(producerProperties); } @Override public void close() throws Exception { if (kafkaProducer != null) { kafkaProducer.close(); } } @Override public void processElement(Long timestamp, Context ctx, Collector<Tuple2<Long, Integer>> out) throws Exception { //timer coalescing avoids firing more than once per timestamp ctx.timerService().registerEventTimeTimer(timestamp); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Long, Integer>> out) throws Exception { //Flink guarantees operator will only fire after normal sink has finished producing messages //send one event per partition downstream kafkaProducer.partitionsFor("OUTPUT_TOPIC").forEach(partitionInfo -> out.collect(new Tuple2<>(timestamp, partitionInfo.partition()))); } }).addSink( new FlinkKafkaProducer<Tuple2<Long, Integer>>(KAFKA_SINK_TOPIC, serializationSchema, producerProperties, Optional.of(new FlinkKafkaPartitioner<Tuple2<Long, Integer>>() { @Override public int partition(Tuple2<Long, Integer> record, byte[] key, byte[] value, String targetTopic, int[] partitions) { return record._2(); //send to the partition it was designed to be sent } }), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducerPoolSize)); On Mon, Sep 2, 2019 at 5:07 PM Padarn Wilson <[hidden email]> wrote: > > Hi Fabian, > > > but each partition may only be written by a single task > > Sorry I think I misunderstand something here then: If I have a topic with one partition, but multiple sink tasks (or parallelism > 1).. this means the data must all be shuffled to the single task writing that partition? > > Padarn > > On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske <[hidden email]> wrote: >> >> Hi Padarn, >> >> Regarding your throughput concerns: A sink task may write to multiple partitions, but each partition may only be written by a single task. >> >> @Eduardo: Thanks for sharing your approach! Not sure if I understood it correctly, but I think that the approach does not guarantee that all results of a window are emitted before the end-of-window marker is written. >> Since the sink operator and the single-task-operator are separate operators, the output records might get stuck (or be bufffered) in one of the sink tasks and the single-task would still emit an end-of-window marker record because it doesn't know about the sink task. >> >> Best, >> Fabian >> >> Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor <[hidden email]>: >>> >>> Hi, >>> >>> I'll chip in with an approach I'm trying at the moment that seems to work, and I say seems because I'm only running this on a personal project. >>> >>> Personally, I don't have anything against end-of-message markers per partition, Padarn you seem to not prefer this option as it overloads the meaning of the output payload. My approach is equally valid when producing watermarks/end-of-message markers on a side output though. >>> >>> The main problem of both approaches is knowing when the window has finished across all partitions without having to wait for the start of the next window. >>> >>> I've taken the approach of sending all output messages of the window to 1. the sink but also 2. a single task operator. The single task operator registers an event time based timer at the time of the end of the window. You have the confidence of the task's timer triggering only once at the right time because all the post-window watermarks go through to the same task. At that point I make the task send an end-of-message marker to every partition. I don't need to send the count because Kafka messages are ordered. AND IF you prefer to not overload the semantic of your original Kafka topic you can post the message to a separate location of your choice. >>> >>> While this does mean that the end of marker message only gets sent through once the window has finished across all substreams (as opposed to per stream), it does mean you don't need to wait for the next window to start AND the watermark gap between substreams should never grow that much anyway. >>> >>> This approach should be particularly useful when the number of partitions or keying mechanism is different between the input and output topics. >>> >>> Hopefully that doesn't sound like a terrible idea. >>> >>> eduardo >>> >>> >>> >>> >>> On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <[hidden email]> wrote: >>>> >>>> Hi again Fabian, >>>> >>>> Thanks for pointing this out to me. In my case there is no need for keyed writing - but I do wonder if having each kafka task write only to a single partition would significantly affect performance. >>>> >>>> Actually now that I think about it, the approach to just wait for the first records of the next window is also subject to the problem you mention above: a producer lagging behind the rest could end up with a partition containing element out of ‘window order’. >>>> >>>> I was also thinking this problem is very similar to that of checkpoint barriers. I intended to dig into the details of the exactly once Kafka sink for some inspiration. >>>> >>>> Padarn >>>> >>>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <[hidden email]> wrote: >>>>> >>>>> Hi Padarn, >>>>> >>>>> Yes, this is quite tricky. >>>>> The "problem" with watermarks is that you need to consider how you write to Kafka. >>>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is written by multiple producers), you need to broadcast the watermarks to each partition, i.e., each partition would receive watermarks from each parallel sink task. So in order to reason about the current watermark of a partition, you need to observe them and take the minimum WM across all current sink task WMs. >>>>> Things become much easier, if each partition is only written by a single task but this also means that data is not key-partitioned in Kafka. >>>>> In that case, the sink task only needs to write a WM message to each of its assigned partitions. >>>>> >>>>> Hope this helps, >>>>> Fabian >>>>> >>>>> >>>>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <[hidden email]>: >>>>>> >>>>>> Hi Fabian, thanks for your input >>>>>> >>>>>> Exactly. Actually my first instinct was to see if it was possible to publish the watermarks somehow - my initial idea was to insert regular watermark messages into each partition of the stream, but exposing this seemed quite troublesome. >>>>>> >>>>>> > In that case, you could have a ProcessFunction that is chained before the sink and which counts the window results per time slice and emits the result when the watermark passes to a side output. >>>>>> All side output messages are collected by a single task and can be published to a Kafka topic or even be made available via Queryable State. >>>>>> >>>>>> I understand the idea here (and exactly once semantics are probably fine for my use case), but counting events seems a bit fragile. I'm not totally confident the consumer can guarantee it won't read duplicates (its a golang kafka library that seems to have some quirks). >>>>>> >>>>>> I think ideally each partition of the kafka topic would have some regular information about watermarks. Perhaps the kafka producer can be modified to support this. >>>>>> >>>>>> Padarn >>>>>> >>>>>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <[hidden email]> wrote: >>>>>>> >>>>>>> Hi Padarn, >>>>>>> >>>>>>> What you describe is essentially publishing Flink's watermarks to an outside system. >>>>>>> Flink processes time windows, by waiting for a watermark that's past the window end time. When it receives such a WM it processes and emits all ended windows and forwards the watermark. >>>>>>> When a sink received a WM for say 12:45:15, you know that all window results with until 12:45:00 have been emitted. >>>>>>> Hence, the watermark tells you about the completeness of data. >>>>>>> >>>>>>> However, using this information is not so easy, mostly because of the failure semantics. >>>>>>> Things become much easier if you produce to Kafka with exactly-once semantics. >>>>>>> >>>>>>> In that case, you could have a ProcessFunction that is chained before the sink and which counts the window results per time slice and emits the result when the watermark passes to a side output. >>>>>>> All side output messages are collected by a single task and can be published to a Kafka topic or even be made available via Queryable State. >>>>>>> >>>>>>> For at-least once output, it's much harder because you'll have duplicates in the output after a job recovered. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> I think you have two options to let the consuming app know about the progress. >>>>>>> You can either >>>>>>> >>>>>>> The ProcessFunction could count per window end timestamp how many records passed and forward that information via a side output. >>>>>>> You could then >>>>>>> >>>>>>> >>>>>>> Essentially, you'd like to publish Flink's watermarks to an outside system (possibly via Kafka). >>>>>>> >>>>>>> >>>>>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson <[hidden email]>: >>>>>>>> >>>>>>>> Hello Users, >>>>>>>> >>>>>>>> I have a question that is perhaps not best solved within Flink: It has to do with notifying a downstream application that a Flink window has completed. >>>>>>>> >>>>>>>> The (simplified) scenario is this: >>>>>>>> - We have a Flink job that consumes from Kafka, does some preprocessing, and then has a sliding window of 10 minutes and slide time of 1 minute. >>>>>>>> - The number of keys in each slide is not fixed >>>>>>>> - The output of the window is then output to Kafka, which is read by a downstream application. >>>>>>>> >>>>>>>> What I want to achieve is that the downstream application can someone know when it has read all of the data for a single window, without waiting for the next window to arrive. >>>>>>>> >>>>>>>> Some options I've considered: >>>>>>>> - Producing a second window over the window results that counts the output size, which can then be used by the downstream application to see when it has received the same number: This seems fragile, as there it relies on there being no loss or duplication of data. Its also an extra window and Kafka stream which is a tad messy. >>>>>>>> - Somehow adding an 'end of window' element to each partitions of the Kafka topic which can be read by the consumer: This seems a bit messy because it mixes different types of events into the same Kafka stream, and there is no really simple way to do this in Flink >>>>>>>> - Package the whole window output into a single message and make this the unit of transaction: This is possible, but the message would be quite large then (at least 10s of mb), as the volume of this stream is quite large. >>>>>>>> - Assume that if the consumer has no elements to read, or if the next window has started to be read, then it has read the whole window: This seems reasonable, and if it wasn't for the fact that my consumer on the application end was a bit inflexible right now, it is probably the solution I would use. >>>>>>>> >>>>>>>> Any further/better ideas? >>>>>>>> >>>>>>>> Thanks >>>>>>>> Padarn |
Free forum by Nabble | Edit this page |