Hello,
I have some question that has been bugging me. Let's say we have a Kafka Source. Checkpoint is enabled, with a period of 5 seconds. We have a FSBackend ( Hadoop ). Now imagine we have a window a tumbling of 10 Minutes. For simplicity we are going to say that we are counting all elements arrinving in 10 Minutes. Something like this. class Count extends FoldFunction[Event, Long] { override def fold(accumulator: Long, value: Event): Long = { accumulator + 1 } } So we have source. window(<Tumbling>). apply(0, Count(), WindowFunction()) In the first 2 Minutes arrives 10 events, then we stop the stream/task/job or it fails and then it is restarted, what will be the state of the fold function ? Will it be 10 and it will resume from there ? Or will it be 0 ? It is kinda important to know because imagine we have a Window of 1 day. And the job fails mid day. How will it resume ? Best Regards |
Hi Daniel, Flink will checkpoint the state of all operations (in your case to HDFS). Flink has several APIs for dealing with state in user functions: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The window operator also internally uses these APIs. Let me know if you need anything more. Cheers, Aljoscha On Thu, 3 Nov 2016 at 19:43 Daniel Santos <[hidden email]> wrote: Hello, |
Hello Aljoscha, Thank you for your reply. But I believe, reading from the docs, that any user function has to be a Rich Function, if it wishes to have state. Now any Rich Function cannot be used or accepted on a Window. For instances looking at flink source version 1.1.3 which is the one I'm currently using, on the class WindowedStream.java we find the following snippet. " public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType) { if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction."); } if (windowAssigner instanceof MergingWindowAssigner) { throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner."); } ... " Now I can see that window operator creates a FoldDescriptor, as you have said it uses the APIs you have described. However I can't see how everything fits. For instances the Count class here described which can only extend a FoldFunction and not a RichFoldFunction, how does flink keeps track of the accumulator ? Because from my tests it seems that it does not. Everytime the program/stream/job is restart the accumulator start from the Initial Value. Kind Regards, Daniel Santos On 11/04/2016 11:01 AM, Aljoscha
Krettek wrote:
|
Hi, the state of the window is kept by the WindowOperator (which uses the state descriptor you mentioned to access the state). The FoldFunction does not itself keep the state but is only used to update the state inside the WindowOperator, if you will. When you say restart, are you talking about stopping the job manually and then restarting? In that case I expect the state to be reset. Flink will perform checkpoints of the state so that it can recover in case of failures, these checkpoints, however, don't survive stopping a job. If you want to persist the state across stopping/restarting you should look into save points: https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/savepoints.html Cheers, Aljoscha On Fri, 4 Nov 2016 at 16:40 Daniel Santos <[hidden email]> wrote:
|
Hi, Thank you very much. I see. Ok it makes sense. I believe there is kinda catch with parallelism. Say one does a savepoint and then it changes the parallelism. I believe the job won't start from the last savepoint is that
correct, on versions ( > 1.2 ), it will start afresh ? Daniel Santos On 11/04/2016 05:54 PM, Aljoscha
Krettek wrote:
|
On 7 November 2016 at 13:06:16, Daniel Santos ([hidden email]) wrote:
> I believe the job won't start from the last savepoint is that correct, > on versions ( > 1.2 ), it will start afresh ? Yes, with 1.2 you will be able to take a savepoint and then resume from that savepoint with different parallelism. :-) – Ufuk |
Hi,
Thank you Ufuk. Hmm. Out of curiosity. Is there any idea when will 1.2 be released? Best Regards, Daniel Santos On November 7, 2016 12:45:51 PM GMT+00:00, Ufuk Celebi <[hidden email]> wrote:
On 7 November 2016 at 13:06:16, Daniel Santos ([hidden email]) wrote:I believe the job won't start from the last savepoint is that correct, |
The goal is to do it before the end of this year. For this to happen, the first release canidate would need to be available by end of November/beginning of December.
There is an ongoing discussion here: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Schedule-and-Scope-for-Flink-1-2-td14062.html#a14306 – Ufuk On 7 November 2016 at 14:05:27, Daniel Santos ([hidden email]) wrote: > Hi, > > Thank you Ufuk. > > Hmm. Out of curiosity. > Is there any idea when will 1.2 be released? > > Best Regards, > Daniel Santos > > On November 7, 2016 12:45:51 PM GMT+00:00, Ufuk Celebi wrote: > >On 7 November 2016 at 13:06:16, Daniel Santos ([hidden email]) > >wrote: > >> I believe the job won't start from the last savepoint is that > >correct, > >> on versions ( > 1.2 ), it will start afresh ? > > > >Yes, with 1.2 you will be able to take a savepoint and then resume from > >that savepoint with different parallelism. :-) > > > >– Ufuk > |
Free forum by Nabble | Edit this page |