Experiencing different watermarking behaviour in local/standalone modes when reading from kafka topic with idle partitions

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Experiencing different watermarking behaviour in local/standalone modes when reading from kafka topic with idle partitions

Salva Alcántara
I am considering this watermarker:

```scala
class MyWatermarker(val maxTimeLag: Long = 0)
    extends AssignerWithPeriodicWatermarks[MyEvent] {
  var maxTs: Long = 0

  override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long):
Long = {
    val timestamp = e.timestamp
    maxTs = maxTs.max(timestamp)
    timestamp
  }

  override def getCurrentWatermark: WatermarkOld = {
    println(s"event watermark: ${maxTs - maxTimeLag}")
    new WatermarkOld(maxTs - maxTimeLag)
  }
```

The underlying events come from a kafka source, and are then handed to a
process function. The implementation is irrelevant for the question, I will
just share the relevant bit:

```scala
  override def processElement(
    event: MyEvent,
    ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
    out: Collector[StreamEvent]
  ): Unit = {
    println(
      s"In process function, got event: $event, ctx.timestamp:
${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
    )
  ...
  }
```

When I run this app on a real kubernetes cluster using a kafka source topic
having idle partitions, the watermark is held back to 0 as expected:

```
In process function, got event: xxx, ctx.timestamp: 1601475710619,
currentWatermark: 0
```

I can also see these logs generated in the watermarker:

```
event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0
```

The funny thing is that when I run the same application locally on IntelliJ,
and also have idle kafka partitions for the same topic, I am getting also
the above logs from the watermarker, with the watermark oscillating between
0 and the ts of the latest received element, since `maxLag = 0`. However,
quite unexpectedly for me, the logs from the process function show that the
watermark is yet advancing:

```
In process function, got event: xxx, ctx.timestamp: 1601475710619,
currentWatermark: 1601475710618
```

Why is this happening? FYI, I am using Flink 1.10 with the environment
parallelism set to 2 and event time semantics in both cases.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Experiencing different watermarking behaviour in local/standalone modes when reading from kafka topic with idle partitions

David Anderson-3
If you were to use per-partition watermarking, which you can do by calling assignTimestampsAndWatermarks directly on the Flink Kafka consumer [1], then I believe the idle partition(s) would consistently hold back the overall watermark.

With per-partition watermarking, each Kafka source task will apply the watermarking separately to each partition it is handling, and then emit as its watermark the minimum of those per-partition watermarks. At least one of the Kafka source tasks will therefore have a watermark of 0, and assuming you have a keyBy after the watermarking and before the process function, that will hold back the watermark at the process function.

Otherwise, if you apply watermarking to the output of the Kafka source tasks, then whether the watermarking tasks have a watermark of 0 or not depends on whether their corresponding Kafka source task has any non-idle partitions. If the assignment of partitions to instances isn't deterministic, this could explain why you are seeing different results in IntelliJ.

Note that the handling of idle sources has been reworked in Flink 1.11 [2], and bug fixes related to that are still pending [3]. 

Regards,
David


On Thu, Oct 1, 2020 at 12:10 PM Salva Alcántara <[hidden email]> wrote:
I am considering this watermarker:

```scala
class MyWatermarker(val maxTimeLag: Long = 0)
    extends AssignerWithPeriodicWatermarks[MyEvent] {
  var maxTs: Long = 0

  override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long):
Long = {
    val timestamp = e.timestamp
    maxTs = maxTs.max(timestamp)
    timestamp
  }

  override def getCurrentWatermark: WatermarkOld = {
    println(s"event watermark: ${maxTs - maxTimeLag}")
    new WatermarkOld(maxTs - maxTimeLag)
  }
```

The underlying events come from a kafka source, and are then handed to a
process function. The implementation is irrelevant for the question, I will
just share the relevant bit:

```scala
  override def processElement(
    event: MyEvent,
    ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
    out: Collector[StreamEvent]
  ): Unit = {
    println(
      s"In process function, got event: $event, ctx.timestamp:
${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
    )
  ...
  }
```

When I run this app on a real kubernetes cluster using a kafka source topic
having idle partitions, the watermark is held back to 0 as expected:

```
In process function, got event: xxx, ctx.timestamp: 1601475710619,
currentWatermark: 0
```

I can also see these logs generated in the watermarker:

```
event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0
```

The funny thing is that when I run the same application locally on IntelliJ,
and also have idle kafka partitions for the same topic, I am getting also
the above logs from the watermarker, with the watermark oscillating between
0 and the ts of the latest received element, since `maxLag = 0`. However,
quite unexpectedly for me, the logs from the process function show that the
watermark is yet advancing:

```
In process function, got event: xxx, ctx.timestamp: 1601475710619,
currentWatermark: 1601475710618
```

Why is this happening? FYI, I am using Flink 1.10 with the environment
parallelism set to 2 and event time semantics in both cases.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Experiencing different watermarking behaviour in local/standalone modes when reading from kafka topic with idle partitions

Salva Alcántara
Awesome David, thanks for clarifying!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Experiencing different watermarking behaviour in local/standalone modes when reading from kafka topic with idle partitions

Salva Alcántara
In reply to this post by David Anderson-3
So, regarding my question, is the behaviour I am observing something that can
be attributed to potential differences between local (IntelliJ) and
standalone modes, or this point has nothing to do and it is more of a
coincidence due to the inherent non-determinism you mention?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/