How to access state in TimestampAssigner in Flink 1.11?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to access state in TimestampAssigner in Flink 1.11?

Theo
Hi there,

Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.

In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which also extended AbstractRichFunction and could thus utilize State and getRuntimeContext() in there. This worked as the TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed my assigner in as the userFunction to that operator.

I used this feature for some "per partition processing" which Flinks somehow isn't ideally suited for at the moment I guess. We have ascending watermarks per kafka partition and do some processing on that. In order to maintain state per kafka-partition, I now keyby kafkapartition in our stream (not ideal but better than operatorstate in terms of rescaling) but afterwards need to emulate the watermark strategy from the initial kafka source, i.e. reassign watermarks the same way as the kafka source did (per kafka partition within the operator). Via getRuntimeContext() I am/was able to identify the kafkaPartitions one operatorinstance was responsible for and could produce the outputwatermark accordingly. (min over all responsible partitions).

In Flink 1.11, how can I rebuild this behavior? Do I really need to build my own TimestampsAndWatermarksOperator which works like the old one? Or is there a better approach?

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: How to access state in TimestampAssigner in Flink 1.11?

Till Rohrmann
Hi Theo,

thanks for reaching out to the community. I am pulling in Aljoscha and Klou who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction and might be able to help you with your problem. At the moment, it looks to me that there is no way to combine state with the new WatermarkGenerator abstraction.

Cheers,
Till

On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal <[hidden email]> wrote:
Hi there,

Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.

In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which also extended AbstractRichFunction and could thus utilize State and getRuntimeContext() in there. This worked as the TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed my assigner in as the userFunction to that operator.

I used this feature for some "per partition processing" which Flinks somehow isn't ideally suited for at the moment I guess. We have ascending watermarks per kafka partition and do some processing on that. In order to maintain state per kafka-partition, I now keyby kafkapartition in our stream (not ideal but better than operatorstate in terms of rescaling) but afterwards need to emulate the watermark strategy from the initial kafka source, i.e. reassign watermarks the same way as the kafka source did (per kafka partition within the operator). Via getRuntimeContext() I am/was able to identify the kafkaPartitions one operatorinstance was responsible for and could produce the outputwatermark accordingly. (min over all responsible partitions).

In Flink 1.11, how can I rebuild this behavior? Do I really need to build my own TimestampsAndWatermarksOperator which works like the old one? Or is there a better approach?

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: How to access state in TimestampAssigner in Flink 1.11?

Aljoscha Krettek
Hi,

sorry for the inconvenience! I'm sure we can find a solution together.

Why do you need to keep state in the Watermark Assigner? The Kafka
source will by itself maintain the watermark per partition, so just
specifying a WatermarkStrategy will already correctly compute the
watermark per partition and then combine them together.

Best,
Aljoscha

On 20.08.20 08:08, Till Rohrmann wrote:

> Hi Theo,
>
> thanks for reaching out to the community. I am pulling in Aljoscha and Klou
> who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction
> and might be able to help you with your problem. At the moment, it looks to
> me that there is no way to combine state with the new WatermarkGenerator
> abstraction.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal <
> [hidden email]> wrote:
>
>> Hi there,
>>
>> Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.
>>
>> In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which
>> also extended AbstractRichFunction and could thus utilize State and
>> getRuntimeContext() in there. This worked as the
>> TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed
>> my assigner in as the userFunction to that operator.
>>
>> I used this feature for some "per partition processing" which Flinks
>> somehow isn't ideally suited for at the moment I guess. We have ascending
>> watermarks per kafka partition and do some processing on that. In order to
>> maintain state per kafka-partition, I now keyby kafkapartition in our
>> stream (not ideal but better than operatorstate in terms of rescaling) but
>> afterwards need to emulate the watermark strategy from the initial kafka
>> source, i.e. reassign watermarks the same way as the kafka source did (per
>> kafka partition within the operator). Via getRuntimeContext() I am/was able
>> to identify the kafkaPartitions one operatorinstance was responsible for
>> and could produce the outputwatermark accordingly. (min over all
>> responsible partitions).
>>
>> In Flink 1.11, how can I rebuild this behavior? Do I really need to build
>> my own TimestampsAndWatermarksOperator which works like the old one? Or is
>> there a better approach?
>>
>> Best regards
>> Theo
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to access state in TimestampAssigner in Flink 1.11?

Theo
Hi Aljoscha,

We have a ProcessFunction which does some processing per kafka partition. It basically buffers the incoming data over 1 minute and throws out some events from the stream if within the minute another related event arrived.

In order to buffer the data and store the events over 1 minute, we perform a groupBy(kafkaPartition) and thus have KeyedState per kafka-partition in there. The process function however messes up the Watermarks (Due to the buffering). Concluding, we have to reassign the watermarks and want to do it the same way as the source kafka assigner would have done. We can only do this if the Timestamp assigner is aware of the Subtask index.

I solved this issue here by not using "assignTimestampAndWatermark" but instead extending TimestampsAndWatermarksOperator and putting a "transform" into the DataStream. I don't like this solution much but it solves the needs here.

Ideally, I would love to not need state here at all (and buffer in RAM only). I would really love to extend the "buffer logic" directly inside the KafkaConsumer so that there is nothing to store on a checkpoint (except for the kafka offsets which would be stored anyways, but just different ones). But that's another topic, I will open a thread on the dev mailing list with a feature request for it soon. Or do you have any other idea how to buffer the data per kafka partition? We have the nice semantic that one kafka partition receives data from one of our servers so that we have ascneding timestamps per partition and can guarantee that some related events always arrive in the same partition. I think that's a rather common usecase in Flink which can optimize the latency a lot, so I would love to have some more features directly from Flink to better support "processing per kafka partition" without the need to shuffle.

Best regards
Theo

----- Ursprüngliche Mail -----
Von: "Aljoscha Krettek" <[hidden email]>
An: "user" <[hidden email]>
Gesendet: Montag, 7. September 2020 11:07:35
Betreff: Re: How to access state in TimestampAssigner in Flink 1.11?

Hi,

sorry for the inconvenience! I'm sure we can find a solution together.

Why do you need to keep state in the Watermark Assigner? The Kafka
source will by itself maintain the watermark per partition, so just
specifying a WatermarkStrategy will already correctly compute the
watermark per partition and then combine them together.

Best,
Aljoscha

On 20.08.20 08:08, Till Rohrmann wrote:

> Hi Theo,
>
> thanks for reaching out to the community. I am pulling in Aljoscha and Klou
> who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction
> and might be able to help you with your problem. At the moment, it looks to
> me that there is no way to combine state with the new WatermarkGenerator
> abstraction.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal <
> [hidden email]> wrote:
>
>> Hi there,
>>
>> Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.
>>
>> In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which
>> also extended AbstractRichFunction and could thus utilize State and
>> getRuntimeContext() in there. This worked as the
>> TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed
>> my assigner in as the userFunction to that operator.
>>
>> I used this feature for some "per partition processing" which Flinks
>> somehow isn't ideally suited for at the moment I guess. We have ascending
>> watermarks per kafka partition and do some processing on that. In order to
>> maintain state per kafka-partition, I now keyby kafkapartition in our
>> stream (not ideal but better than operatorstate in terms of rescaling) but
>> afterwards need to emulate the watermark strategy from the initial kafka
>> source, i.e. reassign watermarks the same way as the kafka source did (per
>> kafka partition within the operator). Via getRuntimeContext() I am/was able
>> to identify the kafkaPartitions one operatorinstance was responsible for
>> and could produce the outputwatermark accordingly. (min over all
>> responsible partitions).
>>
>> In Flink 1.11, how can I rebuild this behavior? Do I really need to build
>> my own TimestampsAndWatermarksOperator which works like the old one? Or is
>> there a better approach?
>>
>> Best regards
>> Theo
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to access state in TimestampAssigner in Flink 1.11?

Aljoscha Krettek
Hi Theo,

I think you're right that there is currently no good built-in solution
for your use case. What you would ideally like to have is some operation
that can buffer records and "hold back" the watermark according to the
timestamps of the records that are in the buffer. This has the benefit
that it doesn't require adding a watermark operation in the middle of
the graph, which can lead to problems because you need to make sure to
do good per-partition watermarking, which is ideally done in a source.

It's certainly possible to write an operator that can do such buffering
and holding back the watermark, you would have to override
processWatermark() keep track of the timestamps of records and the max
watermark seen so far and emit a modified watermark downstream whenever
there are updates and you evict records from the buffer.

I'd be open to suggestions if someone wants to add a proper solution/API
for this to Flink.

Best,
Aljoscha

On 07.09.20 16:51, Theo Diefenthal wrote:

> Hi Aljoscha,
>
> We have a ProcessFunction which does some processing per kafka partition. It basically buffers the incoming data over 1 minute and throws out some events from the stream if within the minute another related event arrived.
>
> In order to buffer the data and store the events over 1 minute, we perform a groupBy(kafkaPartition) and thus have KeyedState per kafka-partition in there. The process function however messes up the Watermarks (Due to the buffering). Concluding, we have to reassign the watermarks and want to do it the same way as the source kafka assigner would have done. We can only do this if the Timestamp assigner is aware of the Subtask index.
>
> I solved this issue here by not using "assignTimestampAndWatermark" but instead extending TimestampsAndWatermarksOperator and putting a "transform" into the DataStream. I don't like this solution much but it solves the needs here.
>
> Ideally, I would love to not need state here at all (and buffer in RAM only). I would really love to extend the "buffer logic" directly inside the KafkaConsumer so that there is nothing to store on a checkpoint (except for the kafka offsets which would be stored anyways, but just different ones). But that's another topic, I will open a thread on the dev mailing list with a feature request for it soon. Or do you have any other idea how to buffer the data per kafka partition? We have the nice semantic that one kafka partition receives data from one of our servers so that we have ascneding timestamps per partition and can guarantee that some related events always arrive in the same partition. I think that's a rather common usecase in Flink which can optimize the latency a lot, so I would love to have some more features directly from Flink to better support "processing per kafka partition" without the need to shuffle.
>
> Best regards
> Theo
>
> ----- Ursprüngliche Mail -----
> Von: "Aljoscha Krettek" <[hidden email]>
> An: "user" <[hidden email]>
> Gesendet: Montag, 7. September 2020 11:07:35
> Betreff: Re: How to access state in TimestampAssigner in Flink 1.11?
>
> Hi,
>
> sorry for the inconvenience! I'm sure we can find a solution together.
>
> Why do you need to keep state in the Watermark Assigner? The Kafka
> source will by itself maintain the watermark per partition, so just
> specifying a WatermarkStrategy will already correctly compute the
> watermark per partition and then combine them together.
>
> Best,
> Aljoscha
>
> On 20.08.20 08:08, Till Rohrmann wrote:
>> Hi Theo,
>>
>> thanks for reaching out to the community. I am pulling in Aljoscha and Klou
>> who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction
>> and might be able to help you with your problem. At the moment, it looks to
>> me that there is no way to combine state with the new WatermarkGenerator
>> abstraction.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal <
>> [hidden email]> wrote:
>>
>>> Hi there,
>>>
>>> Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.
>>>
>>> In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which
>>> also extended AbstractRichFunction and could thus utilize State and
>>> getRuntimeContext() in there. This worked as the
>>> TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed
>>> my assigner in as the userFunction to that operator.
>>>
>>> I used this feature for some "per partition processing" which Flinks
>>> somehow isn't ideally suited for at the moment I guess. We have ascending
>>> watermarks per kafka partition and do some processing on that. In order to
>>> maintain state per kafka-partition, I now keyby kafkapartition in our
>>> stream (not ideal but better than operatorstate in terms of rescaling) but
>>> afterwards need to emulate the watermark strategy from the initial kafka
>>> source, i.e. reassign watermarks the same way as the kafka source did (per
>>> kafka partition within the operator). Via getRuntimeContext() I am/was able
>>> to identify the kafkaPartitions one operatorinstance was responsible for
>>> and could produce the outputwatermark accordingly. (min over all
>>> responsible partitions).
>>>
>>> In Flink 1.11, how can I rebuild this behavior? Do I really need to build
>>> my own TimestampsAndWatermarksOperator which works like the old one? Or is
>>> there a better approach?
>>>
>>> Best regards
>>> Theo
>>>
>>