Hi, If you say that you can reproduce the problem, does that mean reproduce from the single existing checkpoint or also creating other problematic checkpoints? I am asking because maybe a log from the job that produces the problematic checkpoint might be more helpful. You can create a ticket if you want. Best, Stefan
|
In reply to this post by Juho Autio
Hi Juho, would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job. Best, Sihua
On 05/18/2018 15:02,[hidden email] wrote:
|
In reply to this post by Stefan Richter
> If you say that you can reproduce the problem, does that mean reproduce from the single existing checkpoint Yes. > We are including rescaling in some end-to-end tests now and then let’s see what happens. If I understood correctly, there is some difference in how timers & other state are written. It might be interesting if you would include a test with state that holds both timers and keyed MapState, like the code snippet in my original message. I believe this is a common usage pattern any way. Test should verify transactionality of restoring both timers & MapState consistently. On Fri, May 18, 2018 at 10:51 AM, Stefan Richter <[hidden email]> wrote:
|
In reply to this post by gerryzhou
Thanks Sihua, I'll give that RC a try.
On Fri, May 18, 2018 at 10:58 AM, sihua zhou <[hidden email]> wrote:
|
Tested with http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz. It hits the same problem. Btw, why is this error logged on INFO level? 2018-05-18 09:03:52,595 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: null at java.lang.Long.parseLong(Long.java:552) at java.lang.Long.parseLong(Long.java:631) at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more 2018-05-18 09:03:52,596 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: null at java.lang.Long.parseLong(Long.java:552) at java.lang.Long.parseLong(Long.java:631) at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more On Fri, May 18, 2018 at 11:06 AM, Juho Autio <[hidden email]> wrote:
|
Hi Juho, thanks for trying this out. I'm running out of myself now... Let's do bref summarize. - have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling) - the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong) And @Juho, have you try to rescale the job with a different parallelism(not always with 16)? Best, Sihua On 05/18/2018 17:14,[hidden email] wrote:
|
Hi Juho & Stefan, just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4... The bug is here. try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) { for every state handle to get the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID imediately if the state handle's start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost.... @Stefan, this still need your double check, plz correct me if I'm wrong. Best, Sihua
On 05/18/2018 17:29,[hidden email] wrote:
|
Sorry for the incorrect information, that's not the case. Best, Sihua
On 05/19/2018 07:58,[hidden email] wrote:
|
Hi Juho, I tried multi times follow the simple code you privoded, but still can't reproduce the bug you met. There's one more question I'd like to confirm with you, is the stateRetentionMillis a fixed(final) field or it might be changed on some condition?
On 05/19/2018 08:19,[hidden email] wrote:
|
Free forum by Nabble | Edit this page |