Hello community,
Is it possible to know programmatically how many times my Flink stream job restarted since it was running? My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted. Thanks, Felipe |
Hi Felipe,
You can use getRuntimeContext().getAttemptNumber() [1] (but beware that depending on the configuration only a pipeline region can be restarted, not the whole job). But if all you want is to check whether it's a first attempt or not, you can also call context.isRestored() from initializeState() [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- Regards, Roman On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez <[hidden email]> wrote: > > Hello community, > > Is it possible to know programmatically how many times my Flink stream job restarted since it was running? > > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted. > > Thanks, > Felipe > |
Cool! I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately). Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved.... Thanks Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote: Hi Felipe, |
I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction. On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
|
You can also use accumulators [1] to collect the number of restarts
(and then access it via client); but side outputs should work as well. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters Regards, Roman On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez <[hidden email]> wrote: > > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction. > > > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote: >> >> Cool! >> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately). >> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved.... >> >> Thanks >> Felipe >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> >> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote: >>> >>> Hi Felipe, >>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >>> that depending on the configuration only a pipeline region can be >>> restarted, not the whole job). >>> >>> But if all you want is to check whether it's a first attempt or not, >>> you can also call context.isRestored() from initializeState() [2] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>> >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >>> Regards, >>> Roman >>> >>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>> <[hidden email]> wrote: >>> > >>> > Hello community, >>> > >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running? >>> > >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted. >>> > >>> > Thanks, >>> > Felipe >>> > |
So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]: "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks." It is weird because I am extending a ProcessFunction which is a RichFunction. public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData> implements CheckpointedFunction { ... In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1]. -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote: You can also use accumulators [1] to collect the number of restarts |
Does your ProcessFunction has state? If not it would be in line with the documentation. Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state. On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
|
Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests? -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
|
Hi,
Could you please share the test code? I think the returned value might depend on the level on which the tests are executed. If it's a regular job then it should return the correct value (as with cluster). If the environment in which the code is executed is mocked then it can be false. Regards, Roman On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez <[hidden email]> wrote: > > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests? > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > > > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote: >> >> Does your ProcessFunction has state? If not it would be in line with the documentation. >> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state. >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote: >>> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]: >>> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks." >>> >>> It is weird because I am extending a ProcessFunction which is a RichFunction. >>> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData> >>> implements CheckpointedFunction { >>> ... >>> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1]. >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >>> >>> -- >>> -- Felipe Gutierrez >>> -- skype: felipe.o.gutierrez >>> >>> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote: >>>> >>>> You can also use accumulators [1] to collect the number of restarts >>>> (and then access it via client); but side outputs should work as well. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >>>> >>>> Regards, >>>> Roman >>>> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >>>> <[hidden email]> wrote: >>>> > >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction. >>>> > >>>> > >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote: >>>> >> >>>> >> Cool! >>>> >> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately). >>>> >> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved.... >>>> >> >>>> >> Thanks >>>> >> Felipe >>>> >> >>>> >> -- >>>> >> -- Felipe Gutierrez >>>> >> -- skype: felipe.o.gutierrez >>>> >> >>>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote: >>>> >>> >>>> >>> Hi Felipe, >>>> >>> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >>>> >>> that depending on the configuration only a pipeline region can be >>>> >>> restarted, not the whole job). >>>> >>> >>>> >>> But if all you want is to check whether it's a first attempt or not, >>>> >>> you can also call context.isRestored() from initializeState() [2] >>>> >>> >>>> >>> [1] >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>>> >>> >>>> >>> [2] >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>>> >>> >>>> >>> Regards, >>>> >>> Roman >>>> >>> >>>> >>> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>>> >>> <[hidden email]> wrote: >>>> >>> > >>>> >>> > Hello community, >>>> >>> > >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running? >>>> >>> > >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted. >>>> >>> > >>>> >>> > Thanks, >>>> >>> > Felipe >>>> >>> > |
Sure, here it is. Nothing is mocked. I double-checked. UnitTestClass {..... protected static LocalFlinkMiniCluster flink; @BeforeClass public static void prepare() { flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); flink.start(); TestStreamEnvironment.setAsContext(flink, PARALLELISM); } private static Configuration getFlinkConfiguration() { Configuration flinkConfig = new Configuration(); flinkConfig.setInteger("local.number-taskmanager", 1); flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); try { flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath()); } catch (IOException e) { throw new RuntimeException("error in flink cluster config", e); } return flinkConfig; } The class that I check if the job was restarted: public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...> implements CheckpointedFunction { final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") { }; private transient ListState<Long> restartsState; private Long restartsLocal; ... @Override public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception { this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind; // If current time is less than the reference time ahead AND we have the poison auction an exception will throw if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) { LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]", sdfMillis.format(new Date(this.currentTimeMillis)), sdfMillis.format(new Date(this.referenceTimeMillisAhead))); throw new SimulatedException("Transaction ID: " + value.toString() + " not allowed. This is a simple exception for testing purposes."); } out.collect(value); // counts the restarts if (restartsState != null) { List<Long> restoreList = Lists.newArrayList(restartsState.get()); Long attemptsRestart = 0L; if (restoreList != null && !restoreList.isEmpty()) { attemptsRestart = Collections.max(restoreList); if (restartsLocal < attemptsRestart) { restartsLocal = attemptsRestart; ctx.output(outputTag, Long.valueOf(attemptsRestart)); } } LOG.info("Attempts restart: " + attemptsRestart); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception {} @Override public void initializeState(FunctionInitializationContext context) throws Exception { restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); if (context.isRestored()) { List<Long> restoreList = Lists.newArrayList(restartsState.get()); if (restoreList == null || restoreList.isEmpty()) { restartsState.add(1L); LOG.info("restarts: 1"); } else { Long max = Collections.max(restoreList); LOG.info("restarts: " + max); restartsState.add(max + 1); } } else { LOG.info("restarts: never restored"); } } } On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote: Hi, |
Thanks for sharing,
I think the problem is that restartsState is never updated: - on the first attempt, context.isRestored() returns false (and "never restored" is logged) - on subsequent attempts, it again returns false, because the state was never updated before Adding if (!context.isRestored()) { restartsState.add(0L); } should solve the problem (it's also better to use state.update instead of state.add if only max is needed). Regards, Roman On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez <[hidden email]> wrote: > > Sure, here it is. Nothing is mocked. I double-checked. > > UnitTestClass {..... > protected static LocalFlinkMiniCluster flink; > > @BeforeClass > public static void prepare() { > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); > flink.start(); > > TestStreamEnvironment.setAsContext(flink, PARALLELISM); > } > > private static Configuration getFlinkConfiguration() { > Configuration flinkConfig = new Configuration(); > flinkConfig.setInteger("local.number-taskmanager", 1); > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); > try { > flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath()); > } catch (IOException e) { > throw new RuntimeException("error in flink cluster config", e); > } > return flinkConfig; > } > > > The class that I check if the job was restarted: > > public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...> > implements CheckpointedFunction { > > final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") { > }; > private transient ListState<Long> restartsState; > private Long restartsLocal; > ... > @Override > public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception { > this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind; > > // If current time is less than the reference time ahead AND we have the poison auction an exception will throw > if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) { > > LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]", > sdfMillis.format(new Date(this.currentTimeMillis)), > sdfMillis.format(new Date(this.referenceTimeMillisAhead))); > > throw new SimulatedException("Transaction ID: " + value.toString() + > " not allowed. This is a simple exception for testing purposes."); > } > out.collect(value); > > > // counts the restarts > if (restartsState != null) { > List<Long> restoreList = Lists.newArrayList(restartsState.get()); > Long attemptsRestart = 0L; > if (restoreList != null && !restoreList.isEmpty()) { > attemptsRestart = Collections.max(restoreList); > if (restartsLocal < attemptsRestart) { > restartsLocal = attemptsRestart; > ctx.output(outputTag, Long.valueOf(attemptsRestart)); > } > } > LOG.info("Attempts restart: " + attemptsRestart); > } > } > > @Override > public void snapshotState(FunctionSnapshotContext context) throws Exception {} > > @Override > public void initializeState(FunctionInitializationContext context) throws Exception { > restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); > > if (context.isRestored()) { > List<Long> restoreList = Lists.newArrayList(restartsState.get()); > if (restoreList == null || restoreList.isEmpty()) { > restartsState.add(1L); > LOG.info("restarts: 1"); > } else { > Long max = Collections.max(restoreList); > LOG.info("restarts: " + max); > restartsState.add(max + 1); > } > } else { > LOG.info("restarts: never restored"); > } > } > } > > > > > > > > > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote: >> >> Hi, >> >> Could you please share the test code? >> >> I think the returned value might depend on the level on which the >> tests are executed. If it's a regular job then it should return the >> correct value (as with cluster). If the environment in which the code >> is executed is mocked then it can be false. >> >> Regards, >> Roman >> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >> <[hidden email]> wrote: >> > >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests? >> > >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > >> > >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote: >> >> >> >> Does your ProcessFunction has state? If not it would be in line with the documentation. >> >> >> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state. >> >> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote: >> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]: >> >>> >> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks." >> >>> >> >>> It is weird because I am extending a ProcessFunction which is a RichFunction. >> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData> >> >>> implements CheckpointedFunction { >> >>> ... >> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1]. >> >>> >> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>> >> >>> >> >>> -- >> >>> -- Felipe Gutierrez >> >>> -- skype: felipe.o.gutierrez >> >>> >> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote: >> >>>> >> >>>> You can also use accumulators [1] to collect the number of restarts >> >>>> (and then access it via client); but side outputs should work as well. >> >>>> >> >>>> [1] >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >> >>>> >> >>>> Regards, >> >>>> Roman >> >>>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >> >>>> <[hidden email]> wrote: >> >>>> > >> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction. >> >>>> > >> >>>> > >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote: >> >>>> >> >> >>>> >> Cool! >> >>>> >> >> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately). >> >>>> >> >> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved.... >> >>>> >> >> >>>> >> Thanks >> >>>> >> Felipe >> >>>> >> >> >>>> >> -- >> >>>> >> -- Felipe Gutierrez >> >>>> >> -- skype: felipe.o.gutierrez >> >>>> >> >> >>>> >> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote: >> >>>> >>> >> >>>> >>> Hi Felipe, >> >>>> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >> >>>> >>> that depending on the configuration only a pipeline region can be >> >>>> >>> restarted, not the whole job). >> >>>> >>> >> >>>> >>> But if all you want is to check whether it's a first attempt or not, >> >>>> >>> you can also call context.isRestored() from initializeState() [2] >> >>>> >>> >> >>>> >>> [1] >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >>>> >>> >> >>>> >>> [2] >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>>> >>> >> >>>> >>> Regards, >> >>>> >>> Roman >> >>>> >>> >> >>>> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> >>>> >>> <[hidden email]> wrote: >> >>>> >>> > >> >>>> >>> > Hello community, >> >>>> >>> > >> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running? >> >>>> >>> > >> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted. >> >>>> >>> > >> >>>> >>> > Thanks, >> >>>> >>> > Felipe >> >>>> >>> > |
No, it didn't work. The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work. I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure. @Override public void initializeState(FunctionInitializationContext context) throws Exception { // unit tests does not open OperatorStateStore if (context.getOperatorStateStore() != null) { restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); List<Long> restoreList = Lists.newArrayList(restartsState.get()); if (restoreList == null || restoreList.isEmpty()) { restartsState.add(0L); LOG.info("restarts: 0"); } else { Long max = Collections.max(restoreList); LOG.info("restarts: " + max); restartsState.add(max + 1); } } } -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <[hidden email]> wrote: Thanks for sharing, |
I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods. Here is my integration test: https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster. I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests? -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <[hidden email]> wrote:
|
I tried to run the test that you mentioned
(WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. 6f08d0a. In IDE, I see that: - checkpoint is never triggered (sentence is too short, checkpoint pause and interval are too large) - exception is never thrown, so the job never restarted (currentTimeMillis is incremented but referenceTimeMillisAhead is not) When I add sleep between each element, set 10ms interval, 0ms pause and introduce some random exception, I do see 2021-06-18 17:37:53,241 INFO org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess - Attempts restart: 1 in the logs. These settings probably differ on the cluster and there is some unrelated exception which causes a restart. Regards, Roman On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez <[hidden email]> wrote: > > I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods. > > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java > > Here is my integration test: https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 > > If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster. > > I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests? > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > > > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <[hidden email]> wrote: >> >> No, it didn't work. >> >> The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work. >> I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure. >> >> @Override >> public void initializeState(FunctionInitializationContext context) throws Exception { >> // unit tests does not open OperatorStateStore >> if (context.getOperatorStateStore() != null) { >> restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); >> >> List<Long> restoreList = Lists.newArrayList(restartsState.get()); >> if (restoreList == null || restoreList.isEmpty()) { >> restartsState.add(0L); >> LOG.info("restarts: 0"); >> } else { >> Long max = Collections.max(restoreList); >> LOG.info("restarts: " + max); >> restartsState.add(max + 1); >> } >> } >> } >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> >> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <[hidden email]> wrote: >>> >>> Thanks for sharing, >>> >>> I think the problem is that restartsState is never updated: >>> - on the first attempt, context.isRestored() returns false (and "never >>> restored" is logged) >>> - on subsequent attempts, it again returns false, because the state >>> was never updated before >>> >>> Adding >>> if (!context.isRestored()) { restartsState.add(0L); } >>> should solve the problem >>> (it's also better to use state.update instead of state.add if only max >>> is needed). >>> >>> Regards, >>> Roman >>> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez >>> <[hidden email]> wrote: >>> > >>> > Sure, here it is. Nothing is mocked. I double-checked. >>> > >>> > UnitTestClass {..... >>> > protected static LocalFlinkMiniCluster flink; >>> > >>> > @BeforeClass >>> > public static void prepare() { >>> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); >>> > flink.start(); >>> > >>> > TestStreamEnvironment.setAsContext(flink, PARALLELISM); >>> > } >>> > >>> > private static Configuration getFlinkConfiguration() { >>> > Configuration flinkConfig = new Configuration(); >>> > flinkConfig.setInteger("local.number-taskmanager", 1); >>> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); >>> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); >>> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); >>> > try { >>> > flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath()); >>> > } catch (IOException e) { >>> > throw new RuntimeException("error in flink cluster config", e); >>> > } >>> > return flinkConfig; >>> > } >>> > >>> > >>> > The class that I check if the job was restarted: >>> > >>> > public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...> >>> > implements CheckpointedFunction { >>> > >>> > final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") { >>> > }; >>> > private transient ListState<Long> restartsState; >>> > private Long restartsLocal; >>> > ... >>> > @Override >>> > public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception { >>> > this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind; >>> > >>> > // If current time is less than the reference time ahead AND we have the poison auction an exception will throw >>> > if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) { >>> > >>> > LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]", >>> > sdfMillis.format(new Date(this.currentTimeMillis)), >>> > sdfMillis.format(new Date(this.referenceTimeMillisAhead))); >>> > >>> > throw new SimulatedException("Transaction ID: " + value.toString() + >>> > " not allowed. This is a simple exception for testing purposes."); >>> > } >>> > out.collect(value); >>> > >>> > >>> > // counts the restarts >>> > if (restartsState != null) { >>> > List<Long> restoreList = Lists.newArrayList(restartsState.get()); >>> > Long attemptsRestart = 0L; >>> > if (restoreList != null && !restoreList.isEmpty()) { >>> > attemptsRestart = Collections.max(restoreList); >>> > if (restartsLocal < attemptsRestart) { >>> > restartsLocal = attemptsRestart; >>> > ctx.output(outputTag, Long.valueOf(attemptsRestart)); >>> > } >>> > } >>> > LOG.info("Attempts restart: " + attemptsRestart); >>> > } >>> > } >>> > >>> > @Override >>> > public void snapshotState(FunctionSnapshotContext context) throws Exception {} >>> > >>> > @Override >>> > public void initializeState(FunctionInitializationContext context) throws Exception { >>> > restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); >>> > >>> > if (context.isRestored()) { >>> > List<Long> restoreList = Lists.newArrayList(restartsState.get()); >>> > if (restoreList == null || restoreList.isEmpty()) { >>> > restartsState.add(1L); >>> > LOG.info("restarts: 1"); >>> > } else { >>> > Long max = Collections.max(restoreList); >>> > LOG.info("restarts: " + max); >>> > restartsState.add(max + 1); >>> > } >>> > } else { >>> > LOG.info("restarts: never restored"); >>> > } >>> > } >>> > } >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote: >>> >> >>> >> Hi, >>> >> >>> >> Could you please share the test code? >>> >> >>> >> I think the returned value might depend on the level on which the >>> >> tests are executed. If it's a regular job then it should return the >>> >> correct value (as with cluster). If the environment in which the code >>> >> is executed is mocked then it can be false. >>> >> >>> >> Regards, >>> >> Roman >>> >> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >>> >> <[hidden email]> wrote: >>> >> > >>> >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests? >>> >> > >>> >> > -- >>> >> > -- Felipe Gutierrez >>> >> > -- skype: felipe.o.gutierrez >>> >> > >>> >> > >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote: >>> >> >> >>> >> >> Does your ProcessFunction has state? If not it would be in line with the documentation. >>> >> >> >>> >> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state. >>> >> >> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote: >>> >> >>> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]: >>> >> >>> >>> >> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks." >>> >> >>> >>> >> >>> It is weird because I am extending a ProcessFunction which is a RichFunction. >>> >> >>> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData> >>> >> >>> implements CheckpointedFunction { >>> >> >>> ... >>> >> >>> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1]. >>> >> >>> >>> >> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >>> >> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >> >>> >>> >> >>> >>> >> >>> -- >>> >> >>> -- Felipe Gutierrez >>> >> >>> -- skype: felipe.o.gutierrez >>> >> >>> >>> >> >>> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote: >>> >> >>>> >>> >> >>>> You can also use accumulators [1] to collect the number of restarts >>> >> >>>> (and then access it via client); but side outputs should work as well. >>> >> >>>> >>> >> >>>> [1] >>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >>> >> >>>> >>> >> >>>> Regards, >>> >> >>>> Roman >>> >> >>>> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >>> >> >>>> <[hidden email]> wrote: >>> >> >>>> > >>> >> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction. >>> >> >>>> > >>> >> >>>> > >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote: >>> >> >>>> >> >>> >> >>>> >> Cool! >>> >> >>>> >> >>> >> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately). >>> >> >>>> >> >>> >> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved.... >>> >> >>>> >> >>> >> >>>> >> Thanks >>> >> >>>> >> Felipe >>> >> >>>> >> >>> >> >>>> >> -- >>> >> >>>> >> -- Felipe Gutierrez >>> >> >>>> >> -- skype: felipe.o.gutierrez >>> >> >>>> >> >>> >> >>>> >> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote: >>> >> >>>> >>> >>> >> >>>> >>> Hi Felipe, >>> >> >>>> >>> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >>> >> >>>> >>> that depending on the configuration only a pipeline region can be >>> >> >>>> >>> restarted, not the whole job). >>> >> >>>> >>> >>> >> >>>> >>> But if all you want is to check whether it's a first attempt or not, >>> >> >>>> >>> you can also call context.isRestored() from initializeState() [2] >>> >> >>>> >>> >>> >> >>>> >>> [1] >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>> >> >>>> >>> >>> >> >>>> >>> [2] >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >> >>>> >>> >>> >> >>>> >>> Regards, >>> >> >>>> >>> Roman >>> >> >>>> >>> >>> >> >>>> >>> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>> >> >>>> >>> <[hidden email]> wrote: >>> >> >>>> >>> > >>> >> >>>> >>> > Hello community, >>> >> >>>> >>> > >>> >> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running? >>> >> >>>> >>> > >>> >> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted. >>> >> >>>> >>> > >>> >> >>>> >>> > Thanks, >>> >> >>>> >>> > Felipe >>> >> >>>> >>> > |
On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <[hidden email]> wrote: I tried to run the test that you mentioned do you mean inside the processElement() method? what is 0ms pause? do you mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? How do you create a random exception? do you mean not mine SimulatedException? Using these configurations that I just said it is not working for me. I am testing on the terminal "mvn -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder" when I call "env.execute();" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) at org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder 2021-06-18 17:37:53,241 INFO |
> do you mean inside the processElement() method?
I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess. > what is 0ms pause? do you mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); > How do you create a random exception? do you mean not mine SimulatedException? I mean it should be thrown at random because the checkpoint must reliably precede it. So on recovery that there is some state. Checking against 6666... only once assumes that the checkpoint was triggered before. Besides, checkpoint is not guaranteed to be triggered before the end of input. I tried to run it in with maven and it worked after making the source infinite. From the code you provided the parallelism level doesn't seem important and can be set 1 (or restart strategy to full). Then using getRuntimeContext().getAttemptNumber() would be simpler and more reliable. Regards, Roman On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez <[hidden email]> wrote: > > > > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <[hidden email]> wrote: >> >> I tried to run the test that you mentioned >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. >> 6f08d0a. >> >> In IDE, I see that: >> - checkpoint is never triggered (sentence is too short, checkpoint >> pause and interval are too large) >> - exception is never thrown, so the job never restarted >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not) >> >> When I add sleep between each element, set 10ms interval, 0ms pause >> and introduce some random exception, I do see > > > do you mean inside the processElement() method? > > what is 0ms pause? do you mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? > > How do you create a random exception? do you mean not mine SimulatedException? > > Using these configurations that I just said it is not working for me. I am testing on the terminal "mvn -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder" when I call "env.execute();" > > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) > at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) > at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) > at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) > at org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder > > > >> >> 2021-06-18 17:37:53,241 INFO >> org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess - >> Attempts restart: 1 >> in the logs. >> >> These settings probably differ on the cluster and there is some >> unrelated exception which causes a restart. >> >> Regards, >> Roman >> >> On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez >> <[hidden email]> wrote: >> > >> > I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods. >> > >> > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java >> > >> > Here is my integration test: https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 >> > >> > If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster. >> > >> > I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests? >> > >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > >> > >> > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <[hidden email]> wrote: >> >> >> >> No, it didn't work. >> >> >> >> The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work. >> >> I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure. >> >> >> >> @Override >> >> public void initializeState(FunctionInitializationContext context) throws Exception { >> >> // unit tests does not open OperatorStateStore >> >> if (context.getOperatorStateStore() != null) { >> >> restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); >> >> >> >> List<Long> restoreList = Lists.newArrayList(restartsState.get()); >> >> if (restoreList == null || restoreList.isEmpty()) { >> >> restartsState.add(0L); >> >> LOG.info("restarts: 0"); >> >> } else { >> >> Long max = Collections.max(restoreList); >> >> LOG.info("restarts: " + max); >> >> restartsState.add(max + 1); >> >> } >> >> } >> >> } >> >> >> >> -- >> >> -- Felipe Gutierrez >> >> -- skype: felipe.o.gutierrez >> >> >> >> >> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <[hidden email]> wrote: >> >>> >> >>> Thanks for sharing, >> >>> >> >>> I think the problem is that restartsState is never updated: >> >>> - on the first attempt, context.isRestored() returns false (and "never >> >>> restored" is logged) >> >>> - on subsequent attempts, it again returns false, because the state >> >>> was never updated before >> >>> >> >>> Adding >> >>> if (!context.isRestored()) { restartsState.add(0L); } >> >>> should solve the problem >> >>> (it's also better to use state.update instead of state.add if only max >> >>> is needed). >> >>> >> >>> Regards, >> >>> Roman >> >>> >> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez >> >>> <[hidden email]> wrote: >> >>> > >> >>> > Sure, here it is. Nothing is mocked. I double-checked. >> >>> > >> >>> > UnitTestClass {..... >> >>> > protected static LocalFlinkMiniCluster flink; >> >>> > >> >>> > @BeforeClass >> >>> > public static void prepare() { >> >>> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); >> >>> > flink.start(); >> >>> > >> >>> > TestStreamEnvironment.setAsContext(flink, PARALLELISM); >> >>> > } >> >>> > >> >>> > private static Configuration getFlinkConfiguration() { >> >>> > Configuration flinkConfig = new Configuration(); >> >>> > flinkConfig.setInteger("local.number-taskmanager", 1); >> >>> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); >> >>> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); >> >>> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); >> >>> > try { >> >>> > flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath()); >> >>> > } catch (IOException e) { >> >>> > throw new RuntimeException("error in flink cluster config", e); >> >>> > } >> >>> > return flinkConfig; >> >>> > } >> >>> > >> >>> > >> >>> > The class that I check if the job was restarted: >> >>> > >> >>> > public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...> >> >>> > implements CheckpointedFunction { >> >>> > >> >>> > final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") { >> >>> > }; >> >>> > private transient ListState<Long> restartsState; >> >>> > private Long restartsLocal; >> >>> > ... >> >>> > @Override >> >>> > public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception { >> >>> > this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind; >> >>> > >> >>> > // If current time is less than the reference time ahead AND we have the poison auction an exception will throw >> >>> > if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) { >> >>> > >> >>> > LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]", >> >>> > sdfMillis.format(new Date(this.currentTimeMillis)), >> >>> > sdfMillis.format(new Date(this.referenceTimeMillisAhead))); >> >>> > >> >>> > throw new SimulatedException("Transaction ID: " + value.toString() + >> >>> > " not allowed. This is a simple exception for testing purposes."); >> >>> > } >> >>> > out.collect(value); >> >>> > >> >>> > >> >>> > // counts the restarts >> >>> > if (restartsState != null) { >> >>> > List<Long> restoreList = Lists.newArrayList(restartsState.get()); >> >>> > Long attemptsRestart = 0L; >> >>> > if (restoreList != null && !restoreList.isEmpty()) { >> >>> > attemptsRestart = Collections.max(restoreList); >> >>> > if (restartsLocal < attemptsRestart) { >> >>> > restartsLocal = attemptsRestart; >> >>> > ctx.output(outputTag, Long.valueOf(attemptsRestart)); >> >>> > } >> >>> > } >> >>> > LOG.info("Attempts restart: " + attemptsRestart); >> >>> > } >> >>> > } >> >>> > >> >>> > @Override >> >>> > public void snapshotState(FunctionSnapshotContext context) throws Exception {} >> >>> > >> >>> > @Override >> >>> > public void initializeState(FunctionInitializationContext context) throws Exception { >> >>> > restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); >> >>> > >> >>> > if (context.isRestored()) { >> >>> > List<Long> restoreList = Lists.newArrayList(restartsState.get()); >> >>> > if (restoreList == null || restoreList.isEmpty()) { >> >>> > restartsState.add(1L); >> >>> > LOG.info("restarts: 1"); >> >>> > } else { >> >>> > Long max = Collections.max(restoreList); >> >>> > LOG.info("restarts: " + max); >> >>> > restartsState.add(max + 1); >> >>> > } >> >>> > } else { >> >>> > LOG.info("restarts: never restored"); >> >>> > } >> >>> > } >> >>> > } >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote: >> >>> >> >> >>> >> Hi, >> >>> >> >> >>> >> Could you please share the test code? >> >>> >> >> >>> >> I think the returned value might depend on the level on which the >> >>> >> tests are executed. If it's a regular job then it should return the >> >>> >> correct value (as with cluster). If the environment in which the code >> >>> >> is executed is mocked then it can be false. >> >>> >> >> >>> >> Regards, >> >>> >> Roman >> >>> >> >> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >> >>> >> <[hidden email]> wrote: >> >>> >> > >> >>> >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests? >> >>> >> > >> >>> >> > -- >> >>> >> > -- Felipe Gutierrez >> >>> >> > -- skype: felipe.o.gutierrez >> >>> >> > >> >>> >> > >> >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote: >> >>> >> >> >> >>> >> >> Does your ProcessFunction has state? If not it would be in line with the documentation. >> >>> >> >> >> >>> >> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state. >> >>> >> >> >> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote: >> >>> >> >>> >> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]: >> >>> >> >>> >> >>> >> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks." >> >>> >> >>> >> >>> >> >>> It is weird because I am extending a ProcessFunction which is a RichFunction. >> >>> >> >>> >> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData> >> >>> >> >>> implements CheckpointedFunction { >> >>> >> >>> ... >> >>> >> >>> >> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1]. >> >>> >> >>> >> >>> >> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >> >>> >> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> -- >> >>> >> >>> -- Felipe Gutierrez >> >>> >> >>> -- skype: felipe.o.gutierrez >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote: >> >>> >> >>>> >> >>> >> >>>> You can also use accumulators [1] to collect the number of restarts >> >>> >> >>>> (and then access it via client); but side outputs should work as well. >> >>> >> >>>> >> >>> >> >>>> [1] >> >>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >> >>> >> >>>> >> >>> >> >>>> Regards, >> >>> >> >>>> Roman >> >>> >> >>>> >> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >> >>> >> >>>> <[hidden email]> wrote: >> >>> >> >>>> > >> >>> >> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction. >> >>> >> >>>> > >> >>> >> >>>> > >> >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote: >> >>> >> >>>> >> >> >>> >> >>>> >> Cool! >> >>> >> >>>> >> >> >>> >> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately). >> >>> >> >>>> >> >> >>> >> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved.... >> >>> >> >>>> >> >> >>> >> >>>> >> Thanks >> >>> >> >>>> >> Felipe >> >>> >> >>>> >> >> >>> >> >>>> >> -- >> >>> >> >>>> >> -- Felipe Gutierrez >> >>> >> >>>> >> -- skype: felipe.o.gutierrez >> >>> >> >>>> >> >> >>> >> >>>> >> >> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote: >> >>> >> >>>> >>> >> >>> >> >>>> >>> Hi Felipe, >> >>> >> >>>> >>> >> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >> >>> >> >>>> >>> that depending on the configuration only a pipeline region can be >> >>> >> >>>> >>> restarted, not the whole job). >> >>> >> >>>> >>> >> >>> >> >>>> >>> But if all you want is to check whether it's a first attempt or not, >> >>> >> >>>> >>> you can also call context.isRestored() from initializeState() [2] >> >>> >> >>>> >>> >> >>> >> >>>> >>> [1] >> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >>> >> >>>> >>> >> >>> >> >>>> >>> [2] >> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>> >> >>>> >>> >> >>> >> >>>> >>> Regards, >> >>> >> >>>> >>> Roman >> >>> >> >>>> >>> >> >>> >> >>>> >>> >> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> >>> >> >>>> >>> <[hidden email]> wrote: >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > Hello community, >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running? >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted. >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > Thanks, >> >>> >> >>>> >>> > Felipe >> >>> >> >>>> >>> > |
Free forum by Nabble | Edit this page |