Save state on a CoGroupFunction and recover it after a failure

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Save state on a CoGroupFunction and recover it after a failure

Felipe Gutierrez
Hi,

I have a problem on my stream pipeline where the events on a CoGroupFunction are not restored after the application crashes. The application is like this:

stream01.coGroup(stream02)
.where(...).equalTo(...)
.window(TumblingEventTimeWindows.of(1 minute))
.apply(new MyCoGroupFunction())
.process(new MyProcessFunction())
.sink(new MySinkFunction)

The checkpoint is configured to 20 seconds and the window is of 1 minute. I follow this sequence to reproduce the error:
1 - send 6 events to stream01
2 - after 25 seconds I send an event to make the application crash
3 - at this meantime the application recovers
4 - after 25 seconds I send 6 events to stream02

Then, in the MyCoGroupFunction there are only events of stream02. Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction? I am confused because the CoGroupFunction.coGroup() method is called only when the Window closes and then I see the output stream events of this operator. That is when the Collector.collect() is called.

What I think is that the events are held in memory and when the window closes the CoGroupFunction.coGroup() is called. So I have to snapshot the state in an operator before the CoGroupFunction. Is that correct? In case anyone have a toy example of it (CoGroupFunction with Checkpoint and testing it in a unit test) could you please send me the link?

Thanks,
Felipe

Reply | Threaded
Open this post in threaded view
|

Re: Save state on a CoGroupFunction and recover it after a failure

rmetzger0
Hi Felipe,

Which data source are you using?

> Then, in the MyCoGroupFunction there are only events of stream02

Are you storing events in your state?

> Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction?

If you want your state to be persisted with each checkpoint, and recovered after a failure, ye .

On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I have a problem on my stream pipeline where the events on a CoGroupFunction are not restored after the application crashes. The application is like this:

stream01.coGroup(stream02)
.where(...).equalTo(...)
.window(TumblingEventTimeWindows.of(1 minute))
.apply(new MyCoGroupFunction())
.process(new MyProcessFunction())
.sink(new MySinkFunction)

The checkpoint is configured to 20 seconds and the window is of 1 minute. I follow this sequence to reproduce the error:
1 - send 6 events to stream01
2 - after 25 seconds I send an event to make the application crash
3 - at this meantime the application recovers
4 - after 25 seconds I send 6 events to stream02

Then, in the MyCoGroupFunction there are only events of stream02. Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction? I am confused because the CoGroupFunction.coGroup() method is called only when the Window closes and then I see the output stream events of this operator. That is when the Collector.collect() is called.

What I think is that the events are held in memory and when the window closes the CoGroupFunction.coGroup() is called. So I have to snapshot the state in an operator before the CoGroupFunction. Is that correct? In case anyone have a toy example of it (CoGroupFunction with Checkpoint and testing it in a unit test) could you please send me the link?

Thanks,
Felipe

Reply | Threaded
Open this post in threaded view
|

Re: Save state on a CoGroupFunction and recover it after a failure

Felipe Gutierrez
Hi Robert,

1 - I am using Kafka010 as data source.
2 - No, I am not using any kind of ListState. That I think it must be used
3 - Good. I am going to use CheckpointedFunction.

Just a follow-up question. I was reimplementing it using CoProcessFunction to save the state and trigger the window. So, based on your answer I think I am overcomplicating it. If I just use RichCoGroupFunction, save the states on a ListState, and implement CheckpointedFunction, it will do everything that I need. Is that correct? Then I don't have to implement the event window trigger at onTimer(). I just use the regular window from Flink. is that correct?

Thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Wed, Jun 16, 2021 at 2:16 PM Robert Metzger <[hidden email]> wrote:
Hi Felipe,

Which data source are you using?

> Then, in the MyCoGroupFunction there are only events of stream02

Are you storing events in your state?

> Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction?

If you want your state to be persisted with each checkpoint, and recovered after a failure, ye .

On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I have a problem on my stream pipeline where the events on a CoGroupFunction are not restored after the application crashes. The application is like this:

stream01.coGroup(stream02)
.where(...).equalTo(...)
.window(TumblingEventTimeWindows.of(1 minute))
.apply(new MyCoGroupFunction())
.process(new MyProcessFunction())
.sink(new MySinkFunction)

The checkpoint is configured to 20 seconds and the window is of 1 minute. I follow this sequence to reproduce the error:
1 - send 6 events to stream01
2 - after 25 seconds I send an event to make the application crash
3 - at this meantime the application recovers
4 - after 25 seconds I send 6 events to stream02

Then, in the MyCoGroupFunction there are only events of stream02. Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction? I am confused because the CoGroupFunction.coGroup() method is called only when the Window closes and then I see the output stream events of this operator. That is when the Collector.collect() is called.

What I think is that the events are held in memory and when the window closes the CoGroupFunction.coGroup() is called. So I have to snapshot the state in an operator before the CoGroupFunction. Is that correct? In case anyone have a toy example of it (CoGroupFunction with Checkpoint and testing it in a unit test) could you please send me the link?

Thanks,
Felipe

Reply | Threaded
Open this post in threaded view
|

Re: Save state on a CoGroupFunction and recover it after a failure

Felipe Gutierrez
I don't understand how I can save the state of a window on the RichCoGroupFunction if the events arrive on the RichCoGroupFunction.coCgroup only when the window closes. Then, upon a failure I will not recover events that were on the window. This is why I think the approach to this problem is to use a CoProcessFunction where I can update the state of events arriving at CoProcessFunction.processElement1 and CoProcessFunction.processElement2.


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Wed, Jun 16, 2021 at 4:28 PM Felipe Gutierrez <[hidden email]> wrote:
Hi Robert,

1 - I am using Kafka010 as data source.
2 - No, I am not using any kind of ListState. That I think it must be used
3 - Good. I am going to use CheckpointedFunction.

Just a follow-up question. I was reimplementing it using CoProcessFunction to save the state and trigger the window. So, based on your answer I think I am overcomplicating it. If I just use RichCoGroupFunction, save the states on a ListState, and implement CheckpointedFunction, it will do everything that I need. Is that correct? Then I don't have to implement the event window trigger at onTimer(). I just use the regular window from Flink. is that correct?

Thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Wed, Jun 16, 2021 at 2:16 PM Robert Metzger <[hidden email]> wrote:
Hi Felipe,

Which data source are you using?

> Then, in the MyCoGroupFunction there are only events of stream02

Are you storing events in your state?

> Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction?

If you want your state to be persisted with each checkpoint, and recovered after a failure, ye .

On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I have a problem on my stream pipeline where the events on a CoGroupFunction are not restored after the application crashes. The application is like this:

stream01.coGroup(stream02)
.where(...).equalTo(...)
.window(TumblingEventTimeWindows.of(1 minute))
.apply(new MyCoGroupFunction())
.process(new MyProcessFunction())
.sink(new MySinkFunction)

The checkpoint is configured to 20 seconds and the window is of 1 minute. I follow this sequence to reproduce the error:
1 - send 6 events to stream01
2 - after 25 seconds I send an event to make the application crash
3 - at this meantime the application recovers
4 - after 25 seconds I send 6 events to stream02

Then, in the MyCoGroupFunction there are only events of stream02. Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction? I am confused because the CoGroupFunction.coGroup() method is called only when the Window closes and then I see the output stream events of this operator. That is when the Collector.collect() is called.

What I think is that the events are held in memory and when the window closes the CoGroupFunction.coGroup() is called. So I have to snapshot the state in an operator before the CoGroupFunction. Is that correct? In case anyone have a toy example of it (CoGroupFunction with Checkpoint and testing it in a unit test) could you please send me the link?

Thanks,
Felipe