Hi all,
I was wondering what approaches people usually take with reprocessing data with Flink - specifically the case where you want to upgrade a Flink job, and make it reprocess historical data before continuing to process a live stream. I'm wondering if we can do something similar to the 'simple rewind' or 'parallel rewind' which Samza uses to solve this problem, discussed here: https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html Having used Flink over the past couple of months, the main issue I've had involves Flink's internal state - from my experience it seems it is easy to break the state when upgrading a job, or when changing the parallelism of operators, plus there's no easy way to view/access an internal key-value state from outside Flink. For an example of what I mean, consider a Flink job which consumes a stream of 'updates' to items, and maintains a key-value store of items within Flink's internal state (e.g. in RocksDB). The job also writes the updated items to a Kafka topic: My worry with this is that the state in RocksDB could be lost or become incompatible with an updated version of the job. If this happens, we need to be able to rebuild Flink's internal key-value store in RocksDB. So I'd like to be able to do something like this (which I believe is the Samza solution): Has anyone done something like this already with Flink? If so are there any examples of how to do this replay & switchover (rebuild state by consuming from a historical log, then switch over to processing the live stream)? Thanks for any insights, Josh |
Hey Josh,
The way we replay historical data is we have a second Flink job that listens to the same live stream, and stores every single event in Google Cloud Storage. When the main Flink job that is processing the live stream gets a request for a specific data set that it has not been processing yet, it sends a request to the historical flink job for the old data. The live job then starts storing relevant events from the live stream in state. It continues storing the live events until all the events form the historical job have been processed, then it processes the stored events, and finally starts processing the live stream again. As long as it's properly keyed (we key on the specific data set) then it doesn't block anything, keeps everything ordered, and eventually catches up. It also allows us to completely blow away state and rebuild it from scratch. So in you case it looks like what you could do is send a request to the "historical" job whenever you get a item that you don't yet have the current state of. The potential problems you may have are that it may not be possible to store every single historical event, and that you need to make sure there is enough memory to handle the ever increasing state size while the historical events are being replayed (and make sure to clear the state when it is done). It's a little complicated, and pretty expensive, but it works. Let me know if something doesn't make sense. On Thu, Jul 28, 2016 at 1:14 PM, Josh <[hidden email]> wrote:
Jason Brelloch | Product Developer Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox |
Hi Jason, Thanks for the reply - I didn't quite understand all of it though! > it sends a request to the historical flink job for the old data How do you send a request from one Flink job to another? > It continues storing the live events until all the events form the historical job have been processed, then it processes the stored events, and finally starts processing the live stream again. How do you handle the switchover between the live stream and the historical stream? Do you somehow block the live stream source and detect when the historical data source is no longer emitting new elements? > So in you case it looks like what you could do is send a request to the "historical" job whenever you get a item that you don't yet have the current state of. In my case I would want the Flink state to always contain the latest state of every item (except when the job first starts and there's a period of time where it's rebuilding its state from the Kafka log). Since I would have everything needed to rebuild the state persisted in a Kafka topic, I don't think I would need a second Flink job for this? Thanks, Josh On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <[hidden email]> wrote:
|
Hi, I think the exact thing you're trying do do is not possible right now but I know of a workaround that some people have used. For "warming up" the state from the historical data, you would run your regular Flink job but replace the normal Kafka source by a source that reads from the historical data. Then, when all data was read you perform a savepoint and stop the job. Then, you start the same job again but with a Kafka source that reads from your regular input stream. This way you restore with the warmed up state. Now, for the proper way of doing this, I actually have a design doc and prototype that could be used to implement this: https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit. This suggests adding an N-input operator. The interesting thing about my prototype implementation is that the operator first reads from the inputs that are known to be bounded, in series. Then it proceeds to reading the streaming inputs. I think that would be the basis for a solution to your problem. And I'm sure other people are facing that as well. Cheers, Aljoscha On Fri, 29 Jul 2016 at 11:27 Josh <[hidden email]> wrote:
|
Hi, This might be useful to you Thanks, Jagat Singh On 29 July 2016 at 20:59, Aljoscha Krettek <[hidden email]> wrote:
|
In reply to this post by Josh
Aljoscha's approach is probably better, but to answer your questions... >How do you send a request from one Flink job to another? All of our different flink jobs communicate over kafka. So the main flink job would be listening to both a "live" kafka source, and a "historical" kafka source. The historical flink job would listen to a "request" kafka source. When the main job gets an event that it does not have state for it writes to the "request" topic. The historical job would read the request, grab the relevant old events from GCS, and write them to the "historical" kafka topic. The "historical" source and the "live" source are merged and proceed through the main flink job as one stream. >How do you handle the switchover between the live stream and the historical stream? Do you somehow block the live stream source and detect when the historical data source is no longer emitting new elements? When the main job sends a request to the historical job, the main job starts storing any events that are come in for that key. As the historical events come in they are processed immediately. The historical flink job flags the last event it sends. When the main flink job sees the flagged event it knows it is caught up to where it was when it sent the request. You can then process the events that the main job stored, and when that is done you are caught up to the live stream, and can stop storing events for that key and just process them as normal. Keep in mind that this is the dangerous part that I was talking about, where memory in the main job would continue to build until the "historical" events are all processed. >In my case I would want the Flink state to always contain the latest state of every item (except when the job first starts and there's a period of time where it's rebuilding its state from the Kafka log). You could absolutely do it by reading from the beginning of a kafka topic. The reason we do it with GCS is it is really cheap storage, and we are not planning on storing forever on the kafka topic. >Since I would have everything needed to rebuild the state persisted in a Kafka topic, I don't think I would need a second Flink job for this? The reason for the second flink job in our case is that we didn't really want to block the flink task slot while a single key gets caught up. We have a much larger key domain then we have number of task slots, so there would be multiple keys on single task slot. If you go with the single job approach (which might be the right approach for you guys) any other keys on that task slot will be blocked until the one key is getting it's state built up. Hope that helps, On Fri, Jul 29, 2016 at 5:27 AM, Josh <[hidden email]> wrote:
Jason Brelloch | Product Developer Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox |
@Aljoscha - The N-input operator way sounds very nice, for now I think I'll try and get something quick running the hacky way, then if we decide to make this a permanent solution maybe I can work on the proper solution. I was wondering about your suggestion for "warming up" the state and then taking a savepoint and switching sources - since the Kafka sources are stateful and are part of Flink's internal state, wouldn't this break when trying to restore the job with a different source? Would I need to assign the replay source a UID, and when switching from replay to live, remove the replay source and replace it with an dummy operator with the same UID? @Jason - I see what you mean now, with the historical and live Flink jobs. That's an interesting approach - I guess it's solving a slightly different problem to my 'rebuilding Flink state upon starting job' - as you're rebuilding state as part of the main job when it comes across events that require historical data. Actually I think we'll need to do something very similar in the future but right now I can probably get away with something simpler! Thanks for the replies! Josh On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <[hidden email]> wrote:
|
Hi, I have to try this to verify but I think the approach works if you give the two sources different UIDs. The reason is that Flink will ignore state for which it doesn't have an operator to assign it to. Therefore, the state of the "historical Kafka source" should be silently discarded. Cheers, Aljoscha On Fri, 29 Jul 2016 at 18:12 Josh <[hidden email]> wrote:
|
Cool, thanks - I've tried out the approach where we replay data from the Kafka compacted log, then take a savepoint and switch to the live stream. It works but I did have to add in a dummy operator for every operator that was removed. Without doing this, I got an exception: java.lang.IllegalStateException: Failed to rollback to savepoint Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program. This indicates that the program has been changed in a non-compatible way after the savepoint. I had a Kafka source and a flat mapper chained together when replaying, so to make it work I had to add two dummy operators and assign the same UID I used when replaying, like this: stream.map(x => x).uid("kafka-replay").name("dummy-1").startNewChain().map(x => x).name("dummy-2") I guess it would be nice if Flink could recover from removed tasks/operators without needing to add dummy operators like this. Josh On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <[hidden email]> wrote:
|
+Ufuk, looping him in directly On Mon, 1 Aug 2016 at 08:07 Josh <[hidden email]> wrote:
|
No, unfortunately this is the same for 1.1. The idea was to be explicit about what works and what not. I see that this is actually a pain for this use case (which is very nice and reasonable ;)). I think we can either always ignore state that does not match to the new job or if that is too aggressive we can add a flag to ignore unmatched state. On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek <[hidden email]> wrote:
|
+1 to ignore unmatched state. Also +1 to allow programs that resume partially (add some new state that starts empty) Both are quite important for program evolution. On Tue, Aug 2, 2016 at 2:58 PM, Ufuk Celebi <[hidden email]> wrote:
|
Hi, I think it would probably be a good idea to make these tunable from the command line. Otherwise we might run into the problem of accidentally restoring a job that should fail like it does now. Gyula Stephan Ewen <[hidden email]> ezt írta (időpont: 2016. aug. 2., K, 17:17):
|
Hi everyone,
I found this thread while examining an issue where Flink could not start from a savepoint. Problem was that we removed an operator, pretty much the same thing that occurred to Josh earlier in this thread. We are still using 1.0.1 so this is an expected behavior, but I just wondered whether there are any news concerning this topic. Thank you and best regards! Konstantin On 02.08.2016 22:45, Gyula Fóra wrote: > Hi, > > I think it would probably be a good idea to make these tunable from the > command line. Otherwise we might run into the problem of accidentally > restoring a job that should fail like it does now. > > Gyula > > Stephan Ewen <[hidden email] <mailto:[hidden email]>> ezt írta > (időpont: 2016. aug. 2., K, 17:17): > > +1 to ignore unmatched state. > > Also +1 to allow programs that resume partially (add some new state > that starts empty) > > Both are quite important for program evolution. > > On Tue, Aug 2, 2016 at 2:58 PM, Ufuk Celebi <[hidden email] > <mailto:[hidden email]>> wrote: > > No, unfortunately this is the same for 1.1. The idea was to be > explicit about what works and what not. I see that this is > actually a pain for this use case (which is very nice and > reasonable ;)). I think we can either always ignore state that > does not match to the new job or if that is too aggressive we > can add a flag to ignore unmatched state. > > > On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek > <[hidden email] <mailto:[hidden email]>> wrote: > > +Ufuk, looping him in directly > > Hmm, I think this is changed for the 1.1 release. Ufuk could > you please comment? > > > On Mon, 1 Aug 2016 at 08:07 Josh <[hidden email] > <mailto:[hidden email]>> wrote: > > Cool, thanks - I've tried out the approach where we > replay data from the Kafka compacted log, then take a > savepoint and switch to the live stream. > > It works but I did have to add in a dummy operator for > every operator that was removed. Without doing this, I > got an exception: > java.lang.IllegalStateException: Failed to rollback to > savepoint Checkpoint 1 @ 1470059433553 for > cb321c233dfd28f73c565030481657cd. Cannot map old state > for task 02ea922553bc7522bdea373f52a702d6 to the new > program. This indicates that the program has been > changed in a non-compatible way after the savepoint. > > I had a Kafka source and a flat mapper chained together > when replaying, so to make it work I had to add two > dummy operators and assign the same UID I used when > replaying, like this: > stream.map(x => > x).uid("kafka-replay").name("dummy-1").startNewChain().map(x > => x).name("dummy-2") > > I guess it would be nice if Flink could recover from > removed tasks/operators without needing to add dummy > operators like this. > > Josh > > On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi, > I have to try this to verify but I think the > approach works if you give the two sources different > UIDs. The reason is that Flink will ignore state for > which it doesn't have an operator to assign it to. > Therefore, the state of the "historical Kafka > source" should be silently discarded. > > Cheers, > Aljoscha > > On Fri, 29 Jul 2016 at 18:12 Josh <[hidden email] > <mailto:[hidden email]>> wrote: > > @Aljoscha - The N-input operator way sounds very > nice, for now I think I'll try and get something > quick running the hacky way, then if we decide > to make this a permanent solution maybe I can > work on the proper solution. I was wondering > about your suggestion for "warming up" the state > and then taking a savepoint and switching > sources - since the Kafka sources are stateful > and are part of Flink's internal state, wouldn't > this break when trying to restore the job with a > different source? Would I need to assign the > replay source a UID, and when switching from > replay to live, remove the replay source and > replace it with an dummy operator with the same UID? > > @Jason - I see what you mean now, with the > historical and live Flink jobs. That's an > interesting approach - I guess it's solving a > slightly different problem to my 'rebuilding > Flink state upon starting job' - as you're > rebuilding state as part of the main job when it > comes across events that require historical > data. Actually I think we'll need to do > something very similar in the future but right > now I can probably get away with something simpler! > > Thanks for the replies! > > Josh > > > On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch > <[hidden email] > <mailto:[hidden email]>> wrote: > > Aljoscha's approach is probably better, but > to answer your questions... > > >How do you send a request from one Flink job > to another? > All of our different flink jobs communicate > over kafka. So the main flink job would be > listening to both a "live" kafka source, and > a "historical" kafka source. The historical > flink job would listen to a "request" kafka > source. When the main job gets an event > that it does not have state for it writes to > the "request" topic. The historical job > would read the request, grab the relevant > old events from GCS, and write them to the > "historical" kafka topic. The "historical" > source and the "live" source are merged and > proceed through the main flink job as one > stream. > > >How do you handle the switchover between the live stream and the historical stream? Do you somehow block the live stream source and detect when the historical data source is no longer emitting new elements? > When the main job sends a request to the > historical job, the main job starts storing > any events that are come in for that key. > As the historical events come in they are > processed immediately. The historical flink > job flags the last event it sends. When the > main flink job sees the flagged event it > knows it is caught up to where it was when > it sent the request. You can then process > the events that the main job stored, and > when that is done you are caught up to the > live stream, and can stop storing events for > that key and just process them as normal. > > Keep in mind that this is the dangerous part > that I was talking about, where memory in > the main job would continue to build until > the "historical" events are all processed. > > >In my case I would want the Flink state to always contain the latest state of every item (except when the job first starts and there's a period of time where it's rebuilding its state from the Kafka log). > You could absolutely do it by reading from > the beginning of a kafka topic. The reason > we do it with GCS is it is really cheap > storage, and we are not planning on storing > forever on the kafka topic. > > >Since I would have everything needed to > rebuild the state persisted in a Kafka > topic, I don't think I would need a second > Flink job for this? > The reason for the second flink job in our > case is that we didn't really want to block > the flink task slot while a single key gets > caught up. We have a much larger key domain > then we have number of task slots, so there > would be multiple keys on single task slot. > If you go with the single job approach > (which might be the right approach for you > guys) any other keys on that task slot will > be blocked until the one key is getting it's > state built up. > > Hope that helps, > > On Fri, Jul 29, 2016 at 5:27 AM, Josh > <[hidden email] <mailto:[hidden email]>> > wrote: > > Hi Jason, > > Thanks for the reply - I didn't quite > understand all of it though! > > > it sends a request to the historical > flink job for the old data > How do you send a request from one Flink > job to another? > > > It continues storing the live events until all the events form the historical job have been processed, then it processes the stored events, and finally starts processing the live stream again. > How do you handle the switchover between > the live stream and the historical > stream? Do you somehow block the live > stream source and detect when the > historical data source is no longer > emitting new elements? > > > So in you case it looks like what you > could do is send a request to the > "historical" job whenever you get a item > that you don't yet have the current > state of. > In my case I would want the Flink state > to always contain the latest state of > every item (except when the job first > starts and there's a period of time > where it's rebuilding its state from the > Kafka log). Since I would have > everything needed to rebuild the state > persisted in a Kafka topic, I don't > think I would need a second Flink job > for this? > > Thanks, > Josh > > > > > On Thu, Jul 28, 2016 at 6:57 PM, Jason > Brelloch <[hidden email] > <mailto:[hidden email]>> wrote: > > Hey Josh, > > The way we replay historical data is > we have a second Flink job that > listens to the same live stream, and > stores every single event in Google > Cloud Storage. > > When the main Flink job that is > processing the live stream gets a > request for a specific data set that > it has not been processing yet, it > sends a request to the historical > flink job for the old data. The > live job then starts storing > relevant events from the live stream > in state. It continues storing the > live events until all the events > form the historical job have been > processed, then it processes the > stored events, and finally starts > processing the live stream again. > > As long as it's properly keyed (we > key on the specific data set) then > it doesn't block anything, keeps > everything ordered, and eventually > catches up. It also allows us to > completely blow away state and > rebuild it from scratch. > > So in you case it looks like what > you could do is send a request to > the "historical" job whenever you > get a item that you don't yet have > the current state of. > > The potential problems you may have > are that it may not be possible to > store every single historical event, > and that you need to make sure there > is enough memory to handle the ever > increasing state size while the > historical events are being replayed > (and make sure to clear the state > when it is done). > > It's a little complicated, and > pretty expensive, but it works. Let > me know if something doesn't make sense. > > > On Thu, Jul 28, 2016 at 1:14 PM, > Josh <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi all, > > I was wondering what approaches > people usually take with > reprocessing data with Flink - > specifically the case where you > want to upgrade a Flink job, and > make it reprocess historical > data before continuing to > process a live stream. > > I'm wondering if we can do > something similar to the 'simple > rewind' or 'parallel rewind' > which Samza uses to solve this > problem, discussed > here: https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html > > Having used Flink over the past > couple of months, the main issue > I've had involves Flink's > internal state - from my > experience it seems it is easy > to break the state when > upgrading a job, or when > changing the parallelism of > operators, plus there's no easy > way to view/access an internal > key-value state from outside Flink. > > For an example of what I mean, > consider a Flink job which > consumes a stream of 'updates' > to items, and maintains a > key-value store of items within > Flink's internal state (e.g. in > RocksDB). The job also writes > the updated items to a Kafka topic: > > http://oi64.tinypic.com/34q5opf.jpg > > My worry with this is that the > state in RocksDB could be lost > or become incompatible with an > updated version of the job. If > this happens, we need to be able > to rebuild Flink's internal > key-value store in RocksDB. So > I'd like to be able to do > something like this (which I > believe is the Samza solution): > > http://oi67.tinypic.com/219ri95.jpg > > Has anyone done something like > this already with Flink? If so > are there any examples of how to > do this replay & switchover > (rebuild state by consuming from > a historical log, then switch > over to processing the live stream)? > > Thanks for any insights, > Josh > > > > > -- > *Jason Brelloch* | Product Developer > 3405 Piedmont Rd. NE, Suite 325, > Atlanta, GA 30305 > <http://www.bettercloud.com/> > Subscribe to the BetterCloud Monitor > <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> - > Get IT delivered to your inbox > > > > > > -- > *Jason Brelloch* | Product Developer > 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA > 30305 > <http://www.bettercloud.com/> > Subscribe to the BetterCloud Monitor > <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> - > Get IT delivered to your inbox > > > > > -- Konstantin Gregor * [hidden email] TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
On Wed, Oct 26, 2016 at 3:06 PM, Konstantin Gregor
<[hidden email]> wrote: > We are still using 1.0.1 so this is an expected behavior, but I just > wondered whether there are any news concerning this topic. Yes, we will add an option to ignore this while restoring. This will be added to the upcoming 1.1.4 and 1.2.0 releases. What are your plans wrt to upgrading to Flink 1.1? – Ufuk |
Hi Ufuk,
thanks for this information, this is good news! Updating Flink to 1.1 is not really in our hands, but will hopefully happen soon :-) Thank you and best regards Konstantin On 26.10.2016 16:07, Ufuk Celebi wrote: > On Wed, Oct 26, 2016 at 3:06 PM, Konstantin Gregor > <[hidden email]> wrote: >> We are still using 1.0.1 so this is an expected behavior, but I just >> wondered whether there are any news concerning this topic. > > Yes, we will add an option to ignore this while restoring. This will > be added to the upcoming 1.1.4 and 1.2.0 releases. What are your plans > wrt to upgrading to Flink 1.1? > > – Ufuk > -- Konstantin Gregor * [hidden email] TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Free forum by Nabble | Edit this page |