two questions about flink stream processing: kafka sources and TimerService

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

two questions about flink stream processing: kafka sources and TimerService

Jin Yi
hello.  thanks ahead of time for anyone who answers.

1.  verifying my understanding: for a kafka source that's partitioned on the same piece of data that is later used in a keyBy, if we are relying on the kafka timestamp as the event timestamp, is it guaranteed that the event stream of the source is in the kafka pipeline's insertion order for the topic?

2.  is there a way to use the InternalTimerService from within a ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an easy way to do this, except by changing the TimerService interface.  the use case for my need is that i'd like to have timers to clean up the left and right keyed state using namespaced timers like how IntervalJoin does it (https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).  right now, b/c the KeyedCoProcessFunction only gives us the SimpleTimerService via the Context, i can only trigger onTimer execution without being able to refine the cleaning of state to just the event state of the side that a timer was originated from.  without this, it'll end up needing to visit state associated with both event streams which isn't performant as those streams can have different throughputs (and therefore, expect to have different retention characteristics/needs).

thanks.
Reply | Threaded
Open this post in threaded view
|

Re: two questions about flink stream processing: kafka sources and TimerService

Jin Yi
ping?

On Tue, May 11, 2021 at 11:31 PM Jin Yi <[hidden email]> wrote:
hello.  thanks ahead of time for anyone who answers.

1.  verifying my understanding: for a kafka source that's partitioned on the same piece of data that is later used in a keyBy, if we are relying on the kafka timestamp as the event timestamp, is it guaranteed that the event stream of the source is in the kafka pipeline's insertion order for the topic?

2.  is there a way to use the InternalTimerService from within a ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an easy way to do this, except by changing the TimerService interface.  the use case for my need is that i'd like to have timers to clean up the left and right keyed state using namespaced timers like how IntervalJoin does it (https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).  right now, b/c the KeyedCoProcessFunction only gives us the SimpleTimerService via the Context, i can only trigger onTimer execution without being able to refine the cleaning of state to just the event state of the side that a timer was originated from.  without this, it'll end up needing to visit state associated with both event streams which isn't performant as those streams can have different throughputs (and therefore, expect to have different retention characteristics/needs).

thanks.
Reply | Threaded
Open this post in threaded view
|

Re: two questions about flink stream processing: kafka sources and TimerService

Ingo Bürk
In reply to this post by Jin Yi
Hi Jin,

1) As far as I know the order is only guaranteed for events from the same partition. If you want events across partitions to remain in order you may need to use parallelism 1. I'll attach some links here which might be useful:


2) Indeed there doesn't seem to be a way to access the InternalTimerService from a ProcessFunction at the moment. One approach could be to implement this yourself using a MapState. Otherwise I think you need to implement your own operator from which you can then access InternalTimerService similar to how KeyedCoProcessOperator does it as well.


Regards
Ingo

On Wed, May 12, 2021 at 8:32 AM Jin Yi <[hidden email]> wrote:
hello.  thanks ahead of time for anyone who answers.

1.  verifying my understanding: for a kafka source that's partitioned on the same piece of data that is later used in a keyBy, if we are relying on the kafka timestamp as the event timestamp, is it guaranteed that the event stream of the source is in the kafka pipeline's insertion order for the topic?

2.  is there a way to use the InternalTimerService from within a ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an easy way to do this, except by changing the TimerService interface.  the use case for my need is that i'd like to have timers to clean up the left and right keyed state using namespaced timers like how IntervalJoin does it (https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).  right now, b/c the KeyedCoProcessFunction only gives us the SimpleTimerService via the Context, i can only trigger onTimer execution without being able to refine the cleaning of state to just the event state of the side that a timer was originated from.  without this, it'll end up needing to visit state associated with both event streams which isn't performant as those streams can have different throughputs (and therefore, expect to have different retention characteristics/needs).

thanks.
Reply | Threaded
Open this post in threaded view
|

Re: two questions about flink stream processing: kafka sources and TimerService

Jin Yi
thanks ingo!  i'll look at moving to rolling my own operator and using ConnectedStreams.transform with it.

On Tue, May 18, 2021 at 3:18 AM Ingo Bürk <[hidden email]> wrote:
Hi Jin,

1) As far as I know the order is only guaranteed for events from the same partition. If you want events across partitions to remain in order you may need to use parallelism 1. I'll attach some links here which might be useful:


2) Indeed there doesn't seem to be a way to access the InternalTimerService from a ProcessFunction at the moment. One approach could be to implement this yourself using a MapState. Otherwise I think you need to implement your own operator from which you can then access InternalTimerService similar to how KeyedCoProcessOperator does it as well.


Regards
Ingo

On Wed, May 12, 2021 at 8:32 AM Jin Yi <[hidden email]> wrote:
hello.  thanks ahead of time for anyone who answers.

1.  verifying my understanding: for a kafka source that's partitioned on the same piece of data that is later used in a keyBy, if we are relying on the kafka timestamp as the event timestamp, is it guaranteed that the event stream of the source is in the kafka pipeline's insertion order for the topic?

2.  is there a way to use the InternalTimerService from within a ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an easy way to do this, except by changing the TimerService interface.  the use case for my need is that i'd like to have timers to clean up the left and right keyed state using namespaced timers like how IntervalJoin does it (https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).  right now, b/c the KeyedCoProcessFunction only gives us the SimpleTimerService via the Context, i can only trigger onTimer execution without being able to refine the cleaning of state to just the event state of the side that a timer was originated from.  without this, it'll end up needing to visit state associated with both event streams which isn't performant as those streams can have different throughputs (and therefore, expect to have different retention characteristics/needs).

thanks.