Stale watermark due to unconsumed Kafka partitions

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Stale watermark due to unconsumed Kafka partitions

Eduardo Winpenny Tejedor
Hi all,

It was a bit tricky to figure out what was going wrong here, hopefully someone can add the missing piece to the puzzle.

I have a Kafka source with a custom AssignerWithPeriodicWatermarks timestamp assigner. It's a copy of the AscendingTimestampExtractor with a log statement printing each timestamp and watermark produced (along with the hash of the assigner instance so I know exactly how far each substream has progressed). Attached to that there is a JDBCSinkFunction. I have set the whole plan's parallelism to 1 and the max also to 1.

My first surprise was to see there are 16 instances of my assigner created, despite there being only one thread using all 16.

My second surprise was to see there were only 4 assigner instances that were extracting timestamps.

This meant the whole job's watermark wasn't advancing (and while that's not important in this simplified example it is in my real life use case).

If I replace my JDBC sink for a print sink though all 16 assigners get fully used (i.e. they all receive messages from which they have to extract a timestamp).

What is happening here? I don't want to ignore the unattended Kafka partitions or mark them as idle - because I know from using the print sink that they do have messages in them. I'm also surprised that there are 16 instances of the assigner (one per Kafka partition) even though the parallelism of the job is one - is that a conscious decision and if so what's the reason? 

Finally I'd also like to know why only 4 assigners are effectively been used, I suspect it's a JDBC default I can override somehow.

Thanks for getting to the bottom of this!
Reply | Threaded
Open this post in threaded view
|

Re: Stale watermark due to unconsumed Kafka partitions

Stephan Ewen
You can use the Timestamp Assigner / Watermark Generator in two different ways: Per Kafka Partition or per parallel source.

I would usually recommend per Kafka Partition, because if the read position in the partitions drifts apart (for example some partitions are read at the tail, some are read a few minutes behind) then your watermarks get messes up easily, if you do not track them per partition.
There is one instance of the assigner per partition, because its state might be different for each partition (like "highest seen timestamp so far" or "millis since last activity").

Why this behaves differently with a JDBC sink versus simply printing is strange. Does the JDBC sink alter the parallelism or block some parts of the pipeline?


On Sat, Aug 17, 2019 at 2:42 AM Eduardo Winpenny Tejedor <[hidden email]> wrote:
Hi all,

It was a bit tricky to figure out what was going wrong here, hopefully someone can add the missing piece to the puzzle.

I have a Kafka source with a custom AssignerWithPeriodicWatermarks timestamp assigner. It's a copy of the AscendingTimestampExtractor with a log statement printing each timestamp and watermark produced (along with the hash of the assigner instance so I know exactly how far each substream has progressed). Attached to that there is a JDBCSinkFunction. I have set the whole plan's parallelism to 1 and the max also to 1.

My first surprise was to see there are 16 instances of my assigner created, despite there being only one thread using all 16.

My second surprise was to see there were only 4 assigner instances that were extracting timestamps.

This meant the whole job's watermark wasn't advancing (and while that's not important in this simplified example it is in my real life use case).

If I replace my JDBC sink for a print sink though all 16 assigners get fully used (i.e. they all receive messages from which they have to extract a timestamp).

What is happening here? I don't want to ignore the unattended Kafka partitions or mark them as idle - because I know from using the print sink that they do have messages in them. I'm also surprised that there are 16 instances of the assigner (one per Kafka partition) even though the parallelism of the job is one - is that a conscious decision and if so what's the reason? 

Finally I'd also like to know why only 4 assigners are effectively been used, I suspect it's a JDBC default I can override somehow.

Thanks for getting to the bottom of this!
Reply | Threaded
Open this post in threaded view
|

Re: Stale watermark due to unconsumed Kafka partitions

Eduardo Winpenny Tejedor
OK that makes sense, I can see how sharing the same Assigner instance between partitions could be tricky, specially if you're handling state.

Good to hear this is not expected behaviour, I'm using Flink's JDBCAppendTableSink so nothing particularly unconventional (and I get the same result with JDBCOutputFormat). I'll try investigate further and report on it, possibly with a coded example if I don't get to the bottom of it.

Thanks,
Eduardo

On Mon, 19 Aug 2019, 18:51 Stephan Ewen, <[hidden email]> wrote:
You can use the Timestamp Assigner / Watermark Generator in two different ways: Per Kafka Partition or per parallel source.

I would usually recommend per Kafka Partition, because if the read position in the partitions drifts apart (for example some partitions are read at the tail, some are read a few minutes behind) then your watermarks get messes up easily, if you do not track them per partition.
There is one instance of the assigner per partition, because its state might be different for each partition (like "highest seen timestamp so far" or "millis since last activity").

Why this behaves differently with a JDBC sink versus simply printing is strange. Does the JDBC sink alter the parallelism or block some parts of the pipeline?


On Sat, Aug 17, 2019 at 2:42 AM Eduardo Winpenny Tejedor <[hidden email]> wrote:
Hi all,

It was a bit tricky to figure out what was going wrong here, hopefully someone can add the missing piece to the puzzle.

I have a Kafka source with a custom AssignerWithPeriodicWatermarks timestamp assigner. It's a copy of the AscendingTimestampExtractor with a log statement printing each timestamp and watermark produced (along with the hash of the assigner instance so I know exactly how far each substream has progressed). Attached to that there is a JDBCSinkFunction. I have set the whole plan's parallelism to 1 and the max also to 1.

My first surprise was to see there are 16 instances of my assigner created, despite there being only one thread using all 16.

My second surprise was to see there were only 4 assigner instances that were extracting timestamps.

This meant the whole job's watermark wasn't advancing (and while that's not important in this simplified example it is in my real life use case).

If I replace my JDBC sink for a print sink though all 16 assigners get fully used (i.e. they all receive messages from which they have to extract a timestamp).

What is happening here? I don't want to ignore the unattended Kafka partitions or mark them as idle - because I know from using the print sink that they do have messages in them. I'm also surprised that there are 16 instances of the assigner (one per Kafka partition) even though the parallelism of the job is one - is that a conscious decision and if so what's the reason? 

Finally I'd also like to know why only 4 assigners are effectively been used, I suspect it's a JDBC default I can override somehow.

Thanks for getting to the bottom of this!