Missing MapState when Timer fires after restored state

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

Stefan Richter
Hi,

I had a look at the logs from the restoring job and couldn’t find anything suspicious in them. Everything looks as expected and the state files are properly found and transferred from S3. We are including rescaling in some end-to-end tests now and then let’s see what happens. 
If you say that you can reproduce the problem, does that mean reproduce from the single existing checkpoint or also creating other problematic checkpoints? I am asking because maybe a log from the job that produces the problematic checkpoint might be more helpful. You can create a ticket if you want.

Best,
Stefan

Am 18.05.2018 um 09:02 schrieb Juho Autio <[hidden email]>:

I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan



Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

gerryzhou
In reply to this post by Juho Autio
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02[hidden email] wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan


Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

Juho Autio
In reply to this post by Stefan Richter
If you say that you can reproduce the problem, does that mean reproduce from the single existing checkpoint

Yes.

I haven't tried creating another checkpoint and rescaling from it. I can try that.

> We are including rescaling in some end-to-end tests now and then let’s see what happens. 

If I understood correctly, there is some difference in how timers & other state are written. It might be interesting if you would include a test with state that holds both timers and keyed MapState, like the code snippet in my original message. I believe this is a common usage pattern any way. Test should verify transactionality of restoring both timers & MapState consistently.

On Fri, May 18, 2018 at 10:51 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I had a look at the logs from the restoring job and couldn’t find anything suspicious in them. Everything looks as expected and the state files are properly found and transferred from S3. We are including rescaling in some end-to-end tests now and then let’s see what happens. 
If you say that you can reproduce the problem, does that mean reproduce from the single existing checkpoint or also creating other problematic checkpoints? I am asking because maybe a log from the job that produces the problematic checkpoint might be more helpful. You can create a ticket if you want.

Best,
Stefan


Am 18.05.2018 um 09:02 schrieb Juho Autio <[hidden email]>:

I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan




Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

Juho Autio
In reply to this post by gerryzhou
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <[hidden email]> wrote:
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02[hidden email] wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan



Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

Juho Autio

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <[hidden email]> wrote:
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <[hidden email]> wrote:
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02[hidden email] wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan




Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

gerryzhou
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.

- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)

And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?

Best, Sihua




On 05/18/2018 17:14[hidden email] wrote:

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <[hidden email]> wrote:
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <[hidden email]> wrote:
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02[hidden email] wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan




Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

gerryzhou
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4...
The bug is here.

try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {

int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}

iterator.seek(startKeyGroupPrefixBytes);

while (iterator.isValid()) {

int keyGroup = 0;
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

iterator.next();
}
}

for every state handle to get the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID imediately if the state handle's start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost....

@Stefan, this still need your double check, plz correct me if I'm wrong.

Best, Sihua

On 05/18/2018 17:29[hidden email] wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.

- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)

And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?

Best, Sihua




On 05/18/2018 17:14[hidden email] wrote:

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <[hidden email]> wrote:
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <[hidden email]> wrote:
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02[hidden email] wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan




Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

gerryzhou
Sorry for the incorrect information, that's not the case.

Best, Sihua



On 05/19/2018 07:58[hidden email] wrote:
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4...
The bug is here.

try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {

int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}

iterator.seek(startKeyGroupPrefixBytes);

while (iterator.isValid()) {

int keyGroup = 0;
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

iterator.next();
}
}

for every state handle to get the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID imediately if the state handle's start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost....

@Stefan, this still need your double check, plz correct me if I'm wrong.

Best, Sihua

On 05/18/2018 17:29[hidden email] wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.

- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)

And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?

Best, Sihua




On 05/18/2018 17:14[hidden email] wrote:

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <[hidden email]> wrote:
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <[hidden email]> wrote:
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02[hidden email] wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan




Reply | Threaded
Open this post in threaded view
|

Re: Missing MapState when Timer fires after restored state

gerryzhou
Hi Juho,
I tried multi times follow the simple code you privoded, but still can't reproduce the bug you met. There's one more question I'd like to confirm with you, is the stateRetentionMillis a fixed(final) field or it might be changed on some condition?

Best, Sihua
On 05/19/2018 08:19[hidden email] wrote:
Sorry for the incorrect information, that's not the case.

Best, Sihua



On 05/19/2018 07:58[hidden email] wrote:
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4...
The bug is here.

try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {

int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}

iterator.seek(startKeyGroupPrefixBytes);

while (iterator.isValid()) {

int keyGroup = 0;
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

iterator.next();
}
}

for every state handle to get the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID imediately if the state handle's start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost....

@Stefan, this still need your double check, plz correct me if I'm wrong.

Best, Sihua

On 05/18/2018 17:29[hidden email] wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.

- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)

And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?

Best, Sihua




On 05/18/2018 17:14[hidden email] wrote:

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <[hidden email]> wrote:
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <[hidden email]> wrote:
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02[hidden email] wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <[hidden email]> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan




12