I am considering this watermarker:
```scala class MyWatermarker(val maxTimeLag: Long = 0) extends AssignerWithPeriodicWatermarks[MyEvent] { var maxTs: Long = 0 override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long): Long = { val timestamp = e.timestamp maxTs = maxTs.max(timestamp) timestamp } override def getCurrentWatermark: WatermarkOld = { println(s"event watermark: ${maxTs - maxTimeLag}") new WatermarkOld(maxTs - maxTimeLag) } ``` The underlying events come from a kafka source, and are then handed to a process function. The implementation is irrelevant for the question, I will just share the relevant bit: ```scala override def processElement( event: MyEvent, ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context, out: Collector[StreamEvent] ): Unit = { println( s"In process function, got event: $event, ctx.timestamp: ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}" ) ... } ``` When I run this app on a real kubernetes cluster using a kafka source topic having idle partitions, the watermark is held back to 0 as expected: ``` In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 0 ``` I can also see these logs generated in the watermarker: ``` event watermark: 1601475710619 event watermark: 0 event watermark: 1601475710619 event watermark: 0 ``` The funny thing is that when I run the same application locally on IntelliJ, and also have idle kafka partitions for the same topic, I am getting also the above logs from the watermarker, with the watermark oscillating between 0 and the ts of the latest received element, since `maxLag = 0`. However, quite unexpectedly for me, the logs from the process function show that the watermark is yet advancing: ``` In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 1601475710618 ``` Why is this happening? FYI, I am using Flink 1.10 with the environment parallelism set to 2 and event time semantics in both cases. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
If you were to use per-partition watermarking, which you can do by calling assignTimestampsAndWatermarks directly on the Flink Kafka consumer [1], then I believe the idle partition(s) would consistently hold back the overall watermark. With per-partition watermarking, each Kafka source task will apply the watermarking separately to each partition it is handling, and then emit as its watermark the minimum of those per-partition watermarks. At least one of the Kafka source tasks will therefore have a watermark of 0, and assuming you have a keyBy after the watermarking and before the process function, that will hold back the watermark at the process function. Otherwise, if you apply watermarking to the output of the Kafka source tasks, then whether the watermarking tasks have a watermark of 0 or not depends on whether their corresponding Kafka source task has any non-idle partitions. If the assignment of partitions to instances isn't deterministic, this could explain why you are seeing different results in IntelliJ. Note that the handling of idle sources has been reworked in Flink 1.11 [2], and bug fixes related to that are still pending [3]. Regards, David On Thu, Oct 1, 2020 at 12:10 PM Salva Alcántara <[hidden email]> wrote: I am considering this watermarker: |
Awesome David, thanks for clarifying!
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by David Anderson-3
So, regarding my question, is the behaviour I am observing something that can
be attributed to potential differences between local (IntelliJ) and standalone modes, or this point has nothing to do and it is more of a coincidence due to the inherent non-determinism you mention? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |