Hi, I have a problem on my stream pipeline where the events on a CoGroupFunction are not restored after the application crashes. The application is like this: stream01.coGroup(stream02) .where(...).equalTo(...) .window(TumblingEventTimeWindows.of(1 minute)) .apply(new MyCoGroupFunction()) .process(new MyProcessFunction()) .sink(new MySinkFunction) The checkpoint is configured to 20 seconds and the window is of 1 minute. I follow this sequence to reproduce the error: 1 - send 6 events to stream01 2 - after 25 seconds I send an event to make the application crash 3 - at this meantime the application recovers 4 - after 25 seconds I send 6 events to stream02 Then, in the MyCoGroupFunction there are only events of stream02. Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction? I am confused because the CoGroupFunction.coGroup() method is called only when the Window closes and then I see the output stream events of this operator. That is when the Collector.collect() is called. What I think is that the events are held in memory and when the window closes the CoGroupFunction.coGroup() is called. So I have to snapshot the state in an operator before the CoGroupFunction. Is that correct? In case anyone have a toy example of it (CoGroupFunction with Checkpoint and testing it in a unit test) could you please send me the link? Thanks, Felipe |
Hi Felipe, Which data source are you using? > Then, in the MyCoGroupFunction there are only events of stream02 Are you storing events in your state? > Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction? If you want your state to be persisted with each checkpoint, and recovered after a failure, ye . On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez <[hidden email]> wrote:
|
Hi Robert, 1 - I am using Kafka010 as data source. 2 - No, I am not using any kind of ListState. That I think it must be used 3 - Good. I am going to use CheckpointedFunction. Just a follow-up question. I was reimplementing it using CoProcessFunction to save the state and trigger the window. So, based on your answer I think I am overcomplicating it. If I just use RichCoGroupFunction, save the states on a ListState, and implement CheckpointedFunction, it will do everything that I need. Is that correct? Then I don't have to implement the event window trigger at onTimer(). I just use the regular window from Flink. is that correct? Thanks -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez On Wed, Jun 16, 2021 at 2:16 PM Robert Metzger <[hidden email]> wrote:
|
I don't understand how I can save the state of a window on the RichCoGroupFunction if the events arrive on the RichCoGroupFunction.coCgroup only when the window closes. Then, upon a failure I will not recover events that were on the window. This is why I think the approach to this problem is to use a CoProcessFunction where I can update the state of events arriving at CoProcessFunction.processElement1 and CoProcessFunction.processElement2. -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez On Wed, Jun 16, 2021 at 4:28 PM Felipe Gutierrez <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |