How to know (in code) how many times the job restarted?

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

How to know (in code) how many times the job restarted?

Felipe Gutierrez
Hello community,

Is it possible to know programmatically how many times my Flink stream job restarted since it was running?

My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.

Thanks,
Felipe

Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Roman Khachatryan
Hi Felipe,

You can use getRuntimeContext().getAttemptNumber() [1] (but beware
that depending on the configuration only a pipeline region can be
restarted, not the whole job).

But if all you want is to check whether it's a first attempt or not,
you can also call context.isRestored() from initializeState() [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--

Regards,
Roman


On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> Hello community,
>
> Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>
> My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>
> Thanks,
> Felipe
>
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Felipe Gutierrez
Cool!

I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).

Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....

Thanks
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
Hi Felipe,

You can use getRuntimeContext().getAttemptNumber() [1] (but beware
that depending on the configuration only a pipeline region can be
restarted, not the whole job).

But if all you want is to check whether it's a first attempt or not,
you can also call context.isRestored() from initializeState() [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--

Regards,
Roman


On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> Hello community,
>
> Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>
> My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>
> Thanks,
> Felipe
>
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Felipe Gutierrez
I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction. 


On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
Cool!

I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).

Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....

Thanks
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
Hi Felipe,

You can use getRuntimeContext().getAttemptNumber() [1] (but beware
that depending on the configuration only a pipeline region can be
restarted, not the whole job).

But if all you want is to check whether it's a first attempt or not,
you can also call context.isRestored() from initializeState() [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--

Regards,
Roman


On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> Hello community,
>
> Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>
> My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>
> Thanks,
> Felipe
>
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Roman Khachatryan
You can also use accumulators [1] to collect the number of restarts
(and then access it via client); but side outputs should work as well.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters

Regards,
Roman

On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>
>
> On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>>
>> Cool!
>>
>> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>>
>> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>>
>> Thanks
>> Felipe
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Hi Felipe,
>>>
>>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> that depending on the configuration only a pipeline region can be
>>> restarted, not the whole job).
>>>
>>> But if all you want is to check whether it's a first attempt or not,
>>> you can also call context.isRestored() from initializeState() [2]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> <[hidden email]> wrote:
>>> >
>>> > Hello community,
>>> >
>>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>>> >
>>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>>> >
>>> > Thanks,
>>> > Felipe
>>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Felipe Gutierrez
So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:

"Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."

It is weird because I am extending a ProcessFunction which is a RichFunction.

public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
        implements CheckpointedFunction {
...

In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
You can also use accumulators [1] to collect the number of restarts
(and then access it via client); but side outputs should work as well.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters

Regards,
Roman

On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>
>
> On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>>
>> Cool!
>>
>> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>>
>> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>>
>> Thanks
>> Felipe
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Hi Felipe,
>>>
>>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> that depending on the configuration only a pipeline region can be
>>> restarted, not the whole job).
>>>
>>> But if all you want is to check whether it's a first attempt or not,
>>> you can also call context.isRestored() from initializeState() [2]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> <[hidden email]> wrote:
>>> >
>>> > Hello community,
>>> >
>>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>>> >
>>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>>> >
>>> > Thanks,
>>> > Felipe
>>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Arvid Heise-4
Does your ProcessFunction has state? If not it would be in line with the documentation.

Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.

On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:

"Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."

It is weird because I am extending a ProcessFunction which is a RichFunction.

public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
        implements CheckpointedFunction {
...

In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
You can also use accumulators [1] to collect the number of restarts
(and then access it via client); but side outputs should work as well.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters

Regards,
Roman

On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>
>
> On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>>
>> Cool!
>>
>> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>>
>> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>>
>> Thanks
>> Felipe
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Hi Felipe,
>>>
>>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> that depending on the configuration only a pipeline region can be
>>> restarted, not the whole job).
>>>
>>> But if all you want is to check whether it's a first attempt or not,
>>> you can also call context.isRestored() from initializeState() [2]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> <[hidden email]> wrote:
>>> >
>>> > Hello community,
>>> >
>>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>>> >
>>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>>> >
>>> > Thanks,
>>> > Felipe
>>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Felipe Gutierrez
Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
Does your ProcessFunction has state? If not it would be in line with the documentation.

Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.

On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:

"Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."

It is weird because I am extending a ProcessFunction which is a RichFunction.

public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
        implements CheckpointedFunction {
...

In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
You can also use accumulators [1] to collect the number of restarts
(and then access it via client); but side outputs should work as well.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters

Regards,
Roman

On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>
>
> On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>>
>> Cool!
>>
>> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>>
>> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>>
>> Thanks
>> Felipe
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Hi Felipe,
>>>
>>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> that depending on the configuration only a pipeline region can be
>>> restarted, not the whole job).
>>>
>>> But if all you want is to check whether it's a first attempt or not,
>>> you can also call context.isRestored() from initializeState() [2]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> <[hidden email]> wrote:
>>> >
>>> > Hello community,
>>> >
>>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>>> >
>>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>>> >
>>> > Thanks,
>>> > Felipe
>>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Roman Khachatryan
Hi,

Could you please share the test code?

I think the returned value might depend on the level on which the
tests are executed. If it's a regular job then it should return the
correct value (as with cluster). If the environment in which the code
is executed is mocked then it can be false.

Regards,
Roman

On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
>
>
> On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
>>
>> Does your ProcessFunction has state? If not it would be in line with the documentation.
>>
>> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.
>>
>> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
>>>
>>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
>>>
>>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."
>>>
>>> It is weird because I am extending a ProcessFunction which is a RichFunction.
>>>
>>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
>>>         implements CheckpointedFunction {
>>> ...
>>>
>>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>>
>>> --
>>> -- Felipe Gutierrez
>>> -- skype: felipe.o.gutierrez
>>>
>>>
>>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
>>>>
>>>> You can also use accumulators [1] to collect the number of restarts
>>>> (and then access it via client); but side outputs should work as well.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>>> <[hidden email]> wrote:
>>>> >
>>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>>>> >
>>>> >
>>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>>>> >>
>>>> >> Cool!
>>>> >>
>>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>>>> >>
>>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>>>> >>
>>>> >> Thanks
>>>> >> Felipe
>>>> >>
>>>> >> --
>>>> >> -- Felipe Gutierrez
>>>> >> -- skype: felipe.o.gutierrez
>>>> >>
>>>> >>
>>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>>>> >>>
>>>> >>> Hi Felipe,
>>>> >>>
>>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>>> >>> that depending on the configuration only a pipeline region can be
>>>> >>> restarted, not the whole job).
>>>> >>>
>>>> >>> But if all you want is to check whether it's a first attempt or not,
>>>> >>> you can also call context.isRestored() from initializeState() [2]
>>>> >>>
>>>> >>> [1]
>>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>> >>>
>>>> >>> [2]
>>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>> >>>
>>>> >>> Regards,
>>>> >>> Roman
>>>> >>>
>>>> >>>
>>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>>> >>> <[hidden email]> wrote:
>>>> >>> >
>>>> >>> > Hello community,
>>>> >>> >
>>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>>>> >>> >
>>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>>>> >>> >
>>>> >>> > Thanks,
>>>> >>> > Felipe
>>>> >>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Felipe Gutierrez
Sure, here it is. Nothing is mocked. I double-checked.

UnitTestClass {.....
protected static LocalFlinkMiniCluster flink;

@BeforeClass
public static void prepare() {
    flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
    flink.start();

    TestStreamEnvironment.setAsContext(flink, PARALLELISM);
}

private static Configuration getFlinkConfiguration() {
    Configuration flinkConfig = new Configuration();
    flinkConfig.setInteger("local.number-taskmanager", 1);
    flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
    flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
    flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
    try {
        flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath());
    } catch (IOException e) {
        throw new RuntimeException("error in flink cluster config", e);
    }
    return flinkConfig;
}


The class that I check if the job was restarted:

public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...>
        implements CheckpointedFunction {

    final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
    };
    private transient ListState<Long> restartsState;
    private Long restartsLocal;
    ...
    @Override
    public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception {
        this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind;

        // If current time is less than the reference time ahead AND we have the poison auction an exception will throw
        if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) {

            LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]",
                    sdfMillis.format(new Date(this.currentTimeMillis)),
                    sdfMillis.format(new Date(this.referenceTimeMillisAhead)));

            throw new SimulatedException("Transaction ID: " + value.toString() +
                    " not allowed. This is a simple exception for testing purposes.");
        }
        out.collect(value);


        // counts the restarts
        if (restartsState != null) {
            List<Long> restoreList = Lists.newArrayList(restartsState.get());
            Long attemptsRestart = 0L;
            if (restoreList != null && !restoreList.isEmpty()) {
                attemptsRestart = Collections.max(restoreList);
                if (restartsLocal < attemptsRestart) {
                    restartsLocal = attemptsRestart;
                    ctx.output(outputTag, Long.valueOf(attemptsRestart));
                }
            }
            LOG.info("Attempts restart: " + attemptsRestart);
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {}

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));

        if (context.isRestored()) {
            List<Long> restoreList = Lists.newArrayList(restartsState.get());
            if (restoreList == null || restoreList.isEmpty()) {
                restartsState.add(1L);
                LOG.info("restarts: 1");
            } else {
                Long max = Collections.max(restoreList);
                LOG.info("restarts: " + max);
                restartsState.add(max + 1);
            }
        } else {
            LOG.info("restarts: never restored");
        }
    }
}








On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote:
Hi,

Could you please share the test code?

I think the returned value might depend on the level on which the
tests are executed. If it's a regular job then it should return the
correct value (as with cluster). If the environment in which the code
is executed is mocked then it can be false.

Regards,
Roman

On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
>
>
> On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
>>
>> Does your ProcessFunction has state? If not it would be in line with the documentation.
>>
>> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.
>>
>> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
>>>
>>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
>>>
>>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."
>>>
>>> It is weird because I am extending a ProcessFunction which is a RichFunction.
>>>
>>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
>>>         implements CheckpointedFunction {
>>> ...
>>>
>>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>>
>>> --
>>> -- Felipe Gutierrez
>>> -- skype: felipe.o.gutierrez
>>>
>>>
>>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
>>>>
>>>> You can also use accumulators [1] to collect the number of restarts
>>>> (and then access it via client); but side outputs should work as well.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>>> <[hidden email]> wrote:
>>>> >
>>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>>>> >
>>>> >
>>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>>>> >>
>>>> >> Cool!
>>>> >>
>>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>>>> >>
>>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>>>> >>
>>>> >> Thanks
>>>> >> Felipe
>>>> >>
>>>> >> --
>>>> >> -- Felipe Gutierrez
>>>> >> -- skype: felipe.o.gutierrez
>>>> >>
>>>> >>
>>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>>>> >>>
>>>> >>> Hi Felipe,
>>>> >>>
>>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>>> >>> that depending on the configuration only a pipeline region can be
>>>> >>> restarted, not the whole job).
>>>> >>>
>>>> >>> But if all you want is to check whether it's a first attempt or not,
>>>> >>> you can also call context.isRestored() from initializeState() [2]
>>>> >>>
>>>> >>> [1]
>>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>> >>>
>>>> >>> [2]
>>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>> >>>
>>>> >>> Regards,
>>>> >>> Roman
>>>> >>>
>>>> >>>
>>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>>> >>> <[hidden email]> wrote:
>>>> >>> >
>>>> >>> > Hello community,
>>>> >>> >
>>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>>>> >>> >
>>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>>>> >>> >
>>>> >>> > Thanks,
>>>> >>> > Felipe
>>>> >>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Roman Khachatryan
Thanks for sharing,

I think the problem is that restartsState is never updated:
- on the first attempt, context.isRestored() returns false (and "never
restored" is logged)
- on subsequent attempts, it again returns false, because the state
was never updated before

Adding
if (!context.isRestored()) { restartsState.add(0L); }
should solve the problem
(it's also better to use state.update instead of state.add if only max
is needed).

Regards,
Roman

On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> Sure, here it is. Nothing is mocked. I double-checked.
>
> UnitTestClass {.....
> protected static LocalFlinkMiniCluster flink;
>
> @BeforeClass
> public static void prepare() {
>     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>     flink.start();
>
>     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> }
>
> private static Configuration getFlinkConfiguration() {
>     Configuration flinkConfig = new Configuration();
>     flinkConfig.setInteger("local.number-taskmanager", 1);
>     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>     try {
>         flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath());
>     } catch (IOException e) {
>         throw new RuntimeException("error in flink cluster config", e);
>     }
>     return flinkConfig;
> }
>
>
> The class that I check if the job was restarted:
>
> public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...>
>         implements CheckpointedFunction {
>
>     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
>     };
>     private transient ListState<Long> restartsState;
>     private Long restartsLocal;
>     ...
>     @Override
>     public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception {
>         this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind;
>
>         // If current time is less than the reference time ahead AND we have the poison auction an exception will throw
>         if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) {
>
>             LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]",
>                     sdfMillis.format(new Date(this.currentTimeMillis)),
>                     sdfMillis.format(new Date(this.referenceTimeMillisAhead)));
>
>             throw new SimulatedException("Transaction ID: " + value.toString() +
>                     " not allowed. This is a simple exception for testing purposes.");
>         }
>         out.collect(value);
>
>
>         // counts the restarts
>         if (restartsState != null) {
>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>             Long attemptsRestart = 0L;
>             if (restoreList != null && !restoreList.isEmpty()) {
>                 attemptsRestart = Collections.max(restoreList);
>                 if (restartsLocal < attemptsRestart) {
>                     restartsLocal = attemptsRestart;
>                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
>                 }
>             }
>             LOG.info("Attempts restart: " + attemptsRestart);
>         }
>     }
>
>     @Override
>     public void snapshotState(FunctionSnapshotContext context) throws Exception {}
>
>     @Override
>     public void initializeState(FunctionInitializationContext context) throws Exception {
>         restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>
>         if (context.isRestored()) {
>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>             if (restoreList == null || restoreList.isEmpty()) {
>                 restartsState.add(1L);
>                 LOG.info("restarts: 1");
>             } else {
>                 Long max = Collections.max(restoreList);
>                 LOG.info("restarts: " + max);
>                 restartsState.add(max + 1);
>             }
>         } else {
>             LOG.info("restarts: never restored");
>         }
>     }
> }
>
>
>
>
>
>
>
>
> On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> Could you please share the test code?
>>
>> I think the returned value might depend on the level on which the
>> tests are executed. If it's a regular job then it should return the
>> correct value (as with cluster). If the environment in which the code
>> is executed is mocked then it can be false.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>> <[hidden email]> wrote:
>> >
>> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
>> >
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> >
>> >
>> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
>> >>
>> >> Does your ProcessFunction has state? If not it would be in line with the documentation.
>> >>
>> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.
>> >>
>> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
>> >>>
>> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
>> >>>
>> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."
>> >>>
>> >>> It is weird because I am extending a ProcessFunction which is a RichFunction.
>> >>>
>> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
>> >>>         implements CheckpointedFunction {
>> >>> ...
>> >>>
>> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].
>> >>>
>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>
>> >>>
>> >>> --
>> >>> -- Felipe Gutierrez
>> >>> -- skype: felipe.o.gutierrez
>> >>>
>> >>>
>> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
>> >>>>
>> >>>> You can also use accumulators [1] to collect the number of restarts
>> >>>> (and then access it via client); but side outputs should work as well.
>> >>>>
>> >>>> [1]
>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>> >>>>
>> >>>> Regards,
>> >>>> Roman
>> >>>>
>> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>> >>>> <[hidden email]> wrote:
>> >>>> >
>> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>> >>>> >
>> >>>> >
>> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>> >>>> >>
>> >>>> >> Cool!
>> >>>> >>
>> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>> >>>> >>
>> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>> >>>> >>
>> >>>> >> Thanks
>> >>>> >> Felipe
>> >>>> >>
>> >>>> >> --
>> >>>> >> -- Felipe Gutierrez
>> >>>> >> -- skype: felipe.o.gutierrez
>> >>>> >>
>> >>>> >>
>> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>> >>>> >>>
>> >>>> >>> Hi Felipe,
>> >>>> >>>
>> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> >>>> >>> that depending on the configuration only a pipeline region can be
>> >>>> >>> restarted, not the whole job).
>> >>>> >>>
>> >>>> >>> But if all you want is to check whether it's a first attempt or not,
>> >>>> >>> you can also call context.isRestored() from initializeState() [2]
>> >>>> >>>
>> >>>> >>> [1]
>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >>>> >>>
>> >>>> >>> [2]
>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>> >>>
>> >>>> >>> Regards,
>> >>>> >>> Roman
>> >>>> >>>
>> >>>> >>>
>> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >>>> >>> <[hidden email]> wrote:
>> >>>> >>> >
>> >>>> >>> > Hello community,
>> >>>> >>> >
>> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>> >>>> >>> >
>> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>> >>>> >>> >
>> >>>> >>> > Thanks,
>> >>>> >>> > Felipe
>> >>>> >>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Felipe Gutierrez
No, it didn't work.

The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure.

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // unit tests does not open OperatorStateStore
        if (context.getOperatorStateStore() != null) {
            restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));

            List<Long> restoreList = Lists.newArrayList(restartsState.get());
            if (restoreList == null || restoreList.isEmpty()) {
                restartsState.add(0L);
                LOG.info("restarts: 0");
            } else {
                Long max = Collections.max(restoreList);
                LOG.info("restarts: " + max);
                restartsState.add(max + 1);
            }
        }
    }

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <[hidden email]> wrote:
Thanks for sharing,

I think the problem is that restartsState is never updated:
- on the first attempt, context.isRestored() returns false (and "never
restored" is logged)
- on subsequent attempts, it again returns false, because the state
was never updated before

Adding
if (!context.isRestored()) { restartsState.add(0L); }
should solve the problem
(it's also better to use state.update instead of state.add if only max
is needed).

Regards,
Roman

On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> Sure, here it is. Nothing is mocked. I double-checked.
>
> UnitTestClass {.....
> protected static LocalFlinkMiniCluster flink;
>
> @BeforeClass
> public static void prepare() {
>     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>     flink.start();
>
>     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> }
>
> private static Configuration getFlinkConfiguration() {
>     Configuration flinkConfig = new Configuration();
>     flinkConfig.setInteger("local.number-taskmanager", 1);
>     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>     try {
>         flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath());
>     } catch (IOException e) {
>         throw new RuntimeException("error in flink cluster config", e);
>     }
>     return flinkConfig;
> }
>
>
> The class that I check if the job was restarted:
>
> public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...>
>         implements CheckpointedFunction {
>
>     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
>     };
>     private transient ListState<Long> restartsState;
>     private Long restartsLocal;
>     ...
>     @Override
>     public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception {
>         this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind;
>
>         // If current time is less than the reference time ahead AND we have the poison auction an exception will throw
>         if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) {
>
>             LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]",
>                     sdfMillis.format(new Date(this.currentTimeMillis)),
>                     sdfMillis.format(new Date(this.referenceTimeMillisAhead)));
>
>             throw new SimulatedException("Transaction ID: " + value.toString() +
>                     " not allowed. This is a simple exception for testing purposes.");
>         }
>         out.collect(value);
>
>
>         // counts the restarts
>         if (restartsState != null) {
>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>             Long attemptsRestart = 0L;
>             if (restoreList != null && !restoreList.isEmpty()) {
>                 attemptsRestart = Collections.max(restoreList);
>                 if (restartsLocal < attemptsRestart) {
>                     restartsLocal = attemptsRestart;
>                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
>                 }
>             }
>             LOG.info("Attempts restart: " + attemptsRestart);
>         }
>     }
>
>     @Override
>     public void snapshotState(FunctionSnapshotContext context) throws Exception {}
>
>     @Override
>     public void initializeState(FunctionInitializationContext context) throws Exception {
>         restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>
>         if (context.isRestored()) {
>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>             if (restoreList == null || restoreList.isEmpty()) {
>                 restartsState.add(1L);
>                 LOG.info("restarts: 1");
>             } else {
>                 Long max = Collections.max(restoreList);
>                 LOG.info("restarts: " + max);
>                 restartsState.add(max + 1);
>             }
>         } else {
>             LOG.info("restarts: never restored");
>         }
>     }
> }
>
>
>
>
>
>
>
>
> On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> Could you please share the test code?
>>
>> I think the returned value might depend on the level on which the
>> tests are executed. If it's a regular job then it should return the
>> correct value (as with cluster). If the environment in which the code
>> is executed is mocked then it can be false.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>> <[hidden email]> wrote:
>> >
>> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
>> >
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> >
>> >
>> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
>> >>
>> >> Does your ProcessFunction has state? If not it would be in line with the documentation.
>> >>
>> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.
>> >>
>> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
>> >>>
>> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
>> >>>
>> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."
>> >>>
>> >>> It is weird because I am extending a ProcessFunction which is a RichFunction.
>> >>>
>> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
>> >>>         implements CheckpointedFunction {
>> >>> ...
>> >>>
>> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].
>> >>>
>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>
>> >>>
>> >>> --
>> >>> -- Felipe Gutierrez
>> >>> -- skype: felipe.o.gutierrez
>> >>>
>> >>>
>> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
>> >>>>
>> >>>> You can also use accumulators [1] to collect the number of restarts
>> >>>> (and then access it via client); but side outputs should work as well.
>> >>>>
>> >>>> [1]
>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>> >>>>
>> >>>> Regards,
>> >>>> Roman
>> >>>>
>> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>> >>>> <[hidden email]> wrote:
>> >>>> >
>> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>> >>>> >
>> >>>> >
>> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>> >>>> >>
>> >>>> >> Cool!
>> >>>> >>
>> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>> >>>> >>
>> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>> >>>> >>
>> >>>> >> Thanks
>> >>>> >> Felipe
>> >>>> >>
>> >>>> >> --
>> >>>> >> -- Felipe Gutierrez
>> >>>> >> -- skype: felipe.o.gutierrez
>> >>>> >>
>> >>>> >>
>> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>> >>>> >>>
>> >>>> >>> Hi Felipe,
>> >>>> >>>
>> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> >>>> >>> that depending on the configuration only a pipeline region can be
>> >>>> >>> restarted, not the whole job).
>> >>>> >>>
>> >>>> >>> But if all you want is to check whether it's a first attempt or not,
>> >>>> >>> you can also call context.isRestored() from initializeState() [2]
>> >>>> >>>
>> >>>> >>> [1]
>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >>>> >>>
>> >>>> >>> [2]
>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>> >>>
>> >>>> >>> Regards,
>> >>>> >>> Roman
>> >>>> >>>
>> >>>> >>>
>> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >>>> >>> <[hidden email]> wrote:
>> >>>> >>> >
>> >>>> >>> > Hello community,
>> >>>> >>> >
>> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>> >>>> >>> >
>> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>> >>>> >>> >
>> >>>> >>> > Thanks,
>> >>>> >>> > Felipe
>> >>>> >>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Felipe Gutierrez
I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods.



If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster.

I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests?

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <[hidden email]> wrote:
No, it didn't work.

The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure.

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // unit tests does not open OperatorStateStore
        if (context.getOperatorStateStore() != null) {
            restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));

            List<Long> restoreList = Lists.newArrayList(restartsState.get());
            if (restoreList == null || restoreList.isEmpty()) {
                restartsState.add(0L);
                LOG.info("restarts: 0");
            } else {
                Long max = Collections.max(restoreList);
                LOG.info("restarts: " + max);
                restartsState.add(max + 1);
            }
        }
    }

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <[hidden email]> wrote:
Thanks for sharing,

I think the problem is that restartsState is never updated:
- on the first attempt, context.isRestored() returns false (and "never
restored" is logged)
- on subsequent attempts, it again returns false, because the state
was never updated before

Adding
if (!context.isRestored()) { restartsState.add(0L); }
should solve the problem
(it's also better to use state.update instead of state.add if only max
is needed).

Regards,
Roman

On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> Sure, here it is. Nothing is mocked. I double-checked.
>
> UnitTestClass {.....
> protected static LocalFlinkMiniCluster flink;
>
> @BeforeClass
> public static void prepare() {
>     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>     flink.start();
>
>     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> }
>
> private static Configuration getFlinkConfiguration() {
>     Configuration flinkConfig = new Configuration();
>     flinkConfig.setInteger("local.number-taskmanager", 1);
>     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>     try {
>         flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath());
>     } catch (IOException e) {
>         throw new RuntimeException("error in flink cluster config", e);
>     }
>     return flinkConfig;
> }
>
>
> The class that I check if the job was restarted:
>
> public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...>
>         implements CheckpointedFunction {
>
>     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
>     };
>     private transient ListState<Long> restartsState;
>     private Long restartsLocal;
>     ...
>     @Override
>     public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception {
>         this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind;
>
>         // If current time is less than the reference time ahead AND we have the poison auction an exception will throw
>         if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) {
>
>             LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]",
>                     sdfMillis.format(new Date(this.currentTimeMillis)),
>                     sdfMillis.format(new Date(this.referenceTimeMillisAhead)));
>
>             throw new SimulatedException("Transaction ID: " + value.toString() +
>                     " not allowed. This is a simple exception for testing purposes.");
>         }
>         out.collect(value);
>
>
>         // counts the restarts
>         if (restartsState != null) {
>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>             Long attemptsRestart = 0L;
>             if (restoreList != null && !restoreList.isEmpty()) {
>                 attemptsRestart = Collections.max(restoreList);
>                 if (restartsLocal < attemptsRestart) {
>                     restartsLocal = attemptsRestart;
>                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
>                 }
>             }
>             LOG.info("Attempts restart: " + attemptsRestart);
>         }
>     }
>
>     @Override
>     public void snapshotState(FunctionSnapshotContext context) throws Exception {}
>
>     @Override
>     public void initializeState(FunctionInitializationContext context) throws Exception {
>         restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>
>         if (context.isRestored()) {
>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>             if (restoreList == null || restoreList.isEmpty()) {
>                 restartsState.add(1L);
>                 LOG.info("restarts: 1");
>             } else {
>                 Long max = Collections.max(restoreList);
>                 LOG.info("restarts: " + max);
>                 restartsState.add(max + 1);
>             }
>         } else {
>             LOG.info("restarts: never restored");
>         }
>     }
> }
>
>
>
>
>
>
>
>
> On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> Could you please share the test code?
>>
>> I think the returned value might depend on the level on which the
>> tests are executed. If it's a regular job then it should return the
>> correct value (as with cluster). If the environment in which the code
>> is executed is mocked then it can be false.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>> <[hidden email]> wrote:
>> >
>> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
>> >
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> >
>> >
>> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
>> >>
>> >> Does your ProcessFunction has state? If not it would be in line with the documentation.
>> >>
>> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.
>> >>
>> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
>> >>>
>> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
>> >>>
>> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."
>> >>>
>> >>> It is weird because I am extending a ProcessFunction which is a RichFunction.
>> >>>
>> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
>> >>>         implements CheckpointedFunction {
>> >>> ...
>> >>>
>> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].
>> >>>
>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>
>> >>>
>> >>> --
>> >>> -- Felipe Gutierrez
>> >>> -- skype: felipe.o.gutierrez
>> >>>
>> >>>
>> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
>> >>>>
>> >>>> You can also use accumulators [1] to collect the number of restarts
>> >>>> (and then access it via client); but side outputs should work as well.
>> >>>>
>> >>>> [1]
>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>> >>>>
>> >>>> Regards,
>> >>>> Roman
>> >>>>
>> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>> >>>> <[hidden email]> wrote:
>> >>>> >
>> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>> >>>> >
>> >>>> >
>> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>> >>>> >>
>> >>>> >> Cool!
>> >>>> >>
>> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>> >>>> >>
>> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>> >>>> >>
>> >>>> >> Thanks
>> >>>> >> Felipe
>> >>>> >>
>> >>>> >> --
>> >>>> >> -- Felipe Gutierrez
>> >>>> >> -- skype: felipe.o.gutierrez
>> >>>> >>
>> >>>> >>
>> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>> >>>> >>>
>> >>>> >>> Hi Felipe,
>> >>>> >>>
>> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> >>>> >>> that depending on the configuration only a pipeline region can be
>> >>>> >>> restarted, not the whole job).
>> >>>> >>>
>> >>>> >>> But if all you want is to check whether it's a first attempt or not,
>> >>>> >>> you can also call context.isRestored() from initializeState() [2]
>> >>>> >>>
>> >>>> >>> [1]
>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >>>> >>>
>> >>>> >>> [2]
>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>> >>>
>> >>>> >>> Regards,
>> >>>> >>> Roman
>> >>>> >>>
>> >>>> >>>
>> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >>>> >>> <[hidden email]> wrote:
>> >>>> >>> >
>> >>>> >>> > Hello community,
>> >>>> >>> >
>> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>> >>>> >>> >
>> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>> >>>> >>> >
>> >>>> >>> > Thanks,
>> >>>> >>> > Felipe
>> >>>> >>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Roman Khachatryan
I tried to run the test that you mentioned
(WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
6f08d0a.

In IDE, I see that:
- checkpoint is never triggered (sentence is too short, checkpoint
pause and interval are too large)
- exception is never thrown, so the job never restarted
(currentTimeMillis is incremented but referenceTimeMillisAhead is not)

When I add sleep between each element, set 10ms interval, 0ms pause
and introduce some random exception, I do see
2021-06-18 17:37:53,241 INFO
org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess  -
Attempts restart: 1
in the logs.

These settings probably differ on the cluster and there is some
unrelated exception which causes a restart.

Regards,
Roman

On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods.
>
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java
>
> Here is my integration test: https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154
>
> If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster.
>
> I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
>
>
> On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <[hidden email]> wrote:
>>
>> No, it didn't work.
>>
>> The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
>> I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure.
>>
>>     @Override
>>     public void initializeState(FunctionInitializationContext context) throws Exception {
>>         // unit tests does not open OperatorStateStore
>>         if (context.getOperatorStateStore() != null) {
>>             restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>>
>>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>>             if (restoreList == null || restoreList.isEmpty()) {
>>                 restartsState.add(0L);
>>                 LOG.info("restarts: 0");
>>             } else {
>>                 Long max = Collections.max(restoreList);
>>                 LOG.info("restarts: " + max);
>>                 restartsState.add(max + 1);
>>             }
>>         }
>>     }
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Thanks for sharing,
>>>
>>> I think the problem is that restartsState is never updated:
>>> - on the first attempt, context.isRestored() returns false (and "never
>>> restored" is logged)
>>> - on subsequent attempts, it again returns false, because the state
>>> was never updated before
>>>
>>> Adding
>>> if (!context.isRestored()) { restartsState.add(0L); }
>>> should solve the problem
>>> (it's also better to use state.update instead of state.add if only max
>>> is needed).
>>>
>>> Regards,
>>> Roman
>>>
>>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
>>> <[hidden email]> wrote:
>>> >
>>> > Sure, here it is. Nothing is mocked. I double-checked.
>>> >
>>> > UnitTestClass {.....
>>> > protected static LocalFlinkMiniCluster flink;
>>> >
>>> > @BeforeClass
>>> > public static void prepare() {
>>> >     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>>> >     flink.start();
>>> >
>>> >     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
>>> > }
>>> >
>>> > private static Configuration getFlinkConfiguration() {
>>> >     Configuration flinkConfig = new Configuration();
>>> >     flinkConfig.setInteger("local.number-taskmanager", 1);
>>> >     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>>> >     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>>> >     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>>> >     try {
>>> >         flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath());
>>> >     } catch (IOException e) {
>>> >         throw new RuntimeException("error in flink cluster config", e);
>>> >     }
>>> >     return flinkConfig;
>>> > }
>>> >
>>> >
>>> > The class that I check if the job was restarted:
>>> >
>>> > public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...>
>>> >         implements CheckpointedFunction {
>>> >
>>> >     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
>>> >     };
>>> >     private transient ListState<Long> restartsState;
>>> >     private Long restartsLocal;
>>> >     ...
>>> >     @Override
>>> >     public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception {
>>> >         this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind;
>>> >
>>> >         // If current time is less than the reference time ahead AND we have the poison auction an exception will throw
>>> >         if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) {
>>> >
>>> >             LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]",
>>> >                     sdfMillis.format(new Date(this.currentTimeMillis)),
>>> >                     sdfMillis.format(new Date(this.referenceTimeMillisAhead)));
>>> >
>>> >             throw new SimulatedException("Transaction ID: " + value.toString() +
>>> >                     " not allowed. This is a simple exception for testing purposes.");
>>> >         }
>>> >         out.collect(value);
>>> >
>>> >
>>> >         // counts the restarts
>>> >         if (restartsState != null) {
>>> >             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>>> >             Long attemptsRestart = 0L;
>>> >             if (restoreList != null && !restoreList.isEmpty()) {
>>> >                 attemptsRestart = Collections.max(restoreList);
>>> >                 if (restartsLocal < attemptsRestart) {
>>> >                     restartsLocal = attemptsRestart;
>>> >                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
>>> >                 }
>>> >             }
>>> >             LOG.info("Attempts restart: " + attemptsRestart);
>>> >         }
>>> >     }
>>> >
>>> >     @Override
>>> >     public void snapshotState(FunctionSnapshotContext context) throws Exception {}
>>> >
>>> >     @Override
>>> >     public void initializeState(FunctionInitializationContext context) throws Exception {
>>> >         restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>>> >
>>> >         if (context.isRestored()) {
>>> >             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>>> >             if (restoreList == null || restoreList.isEmpty()) {
>>> >                 restartsState.add(1L);
>>> >                 LOG.info("restarts: 1");
>>> >             } else {
>>> >                 Long max = Collections.max(restoreList);
>>> >                 LOG.info("restarts: " + max);
>>> >                 restartsState.add(max + 1);
>>> >             }
>>> >         } else {
>>> >             LOG.info("restarts: never restored");
>>> >         }
>>> >     }
>>> > }
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> Could you please share the test code?
>>> >>
>>> >> I think the returned value might depend on the level on which the
>>> >> tests are executed. If it's a regular job then it should return the
>>> >> correct value (as with cluster). If the environment in which the code
>>> >> is executed is mocked then it can be false.
>>> >>
>>> >> Regards,
>>> >> Roman
>>> >>
>>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>>> >> <[hidden email]> wrote:
>>> >> >
>>> >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
>>> >> >
>>> >> > --
>>> >> > -- Felipe Gutierrez
>>> >> > -- skype: felipe.o.gutierrez
>>> >> >
>>> >> >
>>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
>>> >> >>
>>> >> >> Does your ProcessFunction has state? If not it would be in line with the documentation.
>>> >> >>
>>> >> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.
>>> >> >>
>>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
>>> >> >>>
>>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
>>> >> >>>
>>> >> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."
>>> >> >>>
>>> >> >>> It is weird because I am extending a ProcessFunction which is a RichFunction.
>>> >> >>>
>>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
>>> >> >>>         implements CheckpointedFunction {
>>> >> >>> ...
>>> >> >>>
>>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].
>>> >> >>>
>>> >> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>>> >> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >> >>>
>>> >> >>>
>>> >> >>> --
>>> >> >>> -- Felipe Gutierrez
>>> >> >>> -- skype: felipe.o.gutierrez
>>> >> >>>
>>> >> >>>
>>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
>>> >> >>>>
>>> >> >>>> You can also use accumulators [1] to collect the number of restarts
>>> >> >>>> (and then access it via client); but side outputs should work as well.
>>> >> >>>>
>>> >> >>>> [1]
>>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>> >> >>>>
>>> >> >>>> Regards,
>>> >> >>>> Roman
>>> >> >>>>
>>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>> >> >>>> <[hidden email]> wrote:
>>> >> >>>> >
>>> >> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>>> >> >>>> >
>>> >> >>>> >
>>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>>> >> >>>> >>
>>> >> >>>> >> Cool!
>>> >> >>>> >>
>>> >> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>>> >> >>>> >>
>>> >> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>>> >> >>>> >>
>>> >> >>>> >> Thanks
>>> >> >>>> >> Felipe
>>> >> >>>> >>
>>> >> >>>> >> --
>>> >> >>>> >> -- Felipe Gutierrez
>>> >> >>>> >> -- skype: felipe.o.gutierrez
>>> >> >>>> >>
>>> >> >>>> >>
>>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>>> >> >>>> >>>
>>> >> >>>> >>> Hi Felipe,
>>> >> >>>> >>>
>>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> >> >>>> >>> that depending on the configuration only a pipeline region can be
>>> >> >>>> >>> restarted, not the whole job).
>>> >> >>>> >>>
>>> >> >>>> >>> But if all you want is to check whether it's a first attempt or not,
>>> >> >>>> >>> you can also call context.isRestored() from initializeState() [2]
>>> >> >>>> >>>
>>> >> >>>> >>> [1]
>>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>> >> >>>> >>>
>>> >> >>>> >>> [2]
>>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >> >>>> >>>
>>> >> >>>> >>> Regards,
>>> >> >>>> >>> Roman
>>> >> >>>> >>>
>>> >> >>>> >>>
>>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> >> >>>> >>> <[hidden email]> wrote:
>>> >> >>>> >>> >
>>> >> >>>> >>> > Hello community,
>>> >> >>>> >>> >
>>> >> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>>> >> >>>> >>> >
>>> >> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>>> >> >>>> >>> >
>>> >> >>>> >>> > Thanks,
>>> >> >>>> >>> > Felipe
>>> >> >>>> >>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Felipe Gutierrez


On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <[hidden email]> wrote:
I tried to run the test that you mentioned
(WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
6f08d0a.

In IDE, I see that:
- checkpoint is never triggered (sentence is too short, checkpoint
pause and interval are too large)
- exception is never thrown, so the job never restarted
(currentTimeMillis is incremented but referenceTimeMillisAhead is not)

When I add sleep between each element, set 10ms interval, 0ms pause
and introduce some random exception, I do see

do you mean inside the processElement() method? 

what is 0ms pause? do you mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?

How do you create a random exception? do you mean not mine SimulatedException?

Using these configurations that I just said it is not working for me. I am testing on the terminal "mvn -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder" when I call "env.execute();"

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
at org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder


 
2021-06-18 17:37:53,241 INFO
org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess  -
Attempts restart: 1
in the logs.

These settings probably differ on the cluster and there is some
unrelated exception which causes a restart.

Regards,
Roman

On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods.
>
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java
>
> Here is my integration test: https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154
>
> If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster.
>
> I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
>
>
> On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <[hidden email]> wrote:
>>
>> No, it didn't work.
>>
>> The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
>> I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure.
>>
>>     @Override
>>     public void initializeState(FunctionInitializationContext context) throws Exception {
>>         // unit tests does not open OperatorStateStore
>>         if (context.getOperatorStateStore() != null) {
>>             restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>>
>>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>>             if (restoreList == null || restoreList.isEmpty()) {
>>                 restartsState.add(0L);
>>                 LOG.info("restarts: 0");
>>             } else {
>>                 Long max = Collections.max(restoreList);
>>                 LOG.info("restarts: " + max);
>>                 restartsState.add(max + 1);
>>             }
>>         }
>>     }
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Thanks for sharing,
>>>
>>> I think the problem is that restartsState is never updated:
>>> - on the first attempt, context.isRestored() returns false (and "never
>>> restored" is logged)
>>> - on subsequent attempts, it again returns false, because the state
>>> was never updated before
>>>
>>> Adding
>>> if (!context.isRestored()) { restartsState.add(0L); }
>>> should solve the problem
>>> (it's also better to use state.update instead of state.add if only max
>>> is needed).
>>>
>>> Regards,
>>> Roman
>>>
>>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
>>> <[hidden email]> wrote:
>>> >
>>> > Sure, here it is. Nothing is mocked. I double-checked.
>>> >
>>> > UnitTestClass {.....
>>> > protected static LocalFlinkMiniCluster flink;
>>> >
>>> > @BeforeClass
>>> > public static void prepare() {
>>> >     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>>> >     flink.start();
>>> >
>>> >     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
>>> > }
>>> >
>>> > private static Configuration getFlinkConfiguration() {
>>> >     Configuration flinkConfig = new Configuration();
>>> >     flinkConfig.setInteger("local.number-taskmanager", 1);
>>> >     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>>> >     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>>> >     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>>> >     try {
>>> >         flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath());
>>> >     } catch (IOException e) {
>>> >         throw new RuntimeException("error in flink cluster config", e);
>>> >     }
>>> >     return flinkConfig;
>>> > }
>>> >
>>> >
>>> > The class that I check if the job was restarted:
>>> >
>>> > public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...>
>>> >         implements CheckpointedFunction {
>>> >
>>> >     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
>>> >     };
>>> >     private transient ListState<Long> restartsState;
>>> >     private Long restartsLocal;
>>> >     ...
>>> >     @Override
>>> >     public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception {
>>> >         this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind;
>>> >
>>> >         // If current time is less than the reference time ahead AND we have the poison auction an exception will throw
>>> >         if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) {
>>> >
>>> >             LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]",
>>> >                     sdfMillis.format(new Date(this.currentTimeMillis)),
>>> >                     sdfMillis.format(new Date(this.referenceTimeMillisAhead)));
>>> >
>>> >             throw new SimulatedException("Transaction ID: " + value.toString() +
>>> >                     " not allowed. This is a simple exception for testing purposes.");
>>> >         }
>>> >         out.collect(value);
>>> >
>>> >
>>> >         // counts the restarts
>>> >         if (restartsState != null) {
>>> >             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>>> >             Long attemptsRestart = 0L;
>>> >             if (restoreList != null && !restoreList.isEmpty()) {
>>> >                 attemptsRestart = Collections.max(restoreList);
>>> >                 if (restartsLocal < attemptsRestart) {
>>> >                     restartsLocal = attemptsRestart;
>>> >                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
>>> >                 }
>>> >             }
>>> >             LOG.info("Attempts restart: " + attemptsRestart);
>>> >         }
>>> >     }
>>> >
>>> >     @Override
>>> >     public void snapshotState(FunctionSnapshotContext context) throws Exception {}
>>> >
>>> >     @Override
>>> >     public void initializeState(FunctionInitializationContext context) throws Exception {
>>> >         restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>>> >
>>> >         if (context.isRestored()) {
>>> >             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>>> >             if (restoreList == null || restoreList.isEmpty()) {
>>> >                 restartsState.add(1L);
>>> >                 LOG.info("restarts: 1");
>>> >             } else {
>>> >                 Long max = Collections.max(restoreList);
>>> >                 LOG.info("restarts: " + max);
>>> >                 restartsState.add(max + 1);
>>> >             }
>>> >         } else {
>>> >             LOG.info("restarts: never restored");
>>> >         }
>>> >     }
>>> > }
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> Could you please share the test code?
>>> >>
>>> >> I think the returned value might depend on the level on which the
>>> >> tests are executed. If it's a regular job then it should return the
>>> >> correct value (as with cluster). If the environment in which the code
>>> >> is executed is mocked then it can be false.
>>> >>
>>> >> Regards,
>>> >> Roman
>>> >>
>>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>>> >> <[hidden email]> wrote:
>>> >> >
>>> >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
>>> >> >
>>> >> > --
>>> >> > -- Felipe Gutierrez
>>> >> > -- skype: felipe.o.gutierrez
>>> >> >
>>> >> >
>>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
>>> >> >>
>>> >> >> Does your ProcessFunction has state? If not it would be in line with the documentation.
>>> >> >>
>>> >> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.
>>> >> >>
>>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
>>> >> >>>
>>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
>>> >> >>>
>>> >> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."
>>> >> >>>
>>> >> >>> It is weird because I am extending a ProcessFunction which is a RichFunction.
>>> >> >>>
>>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
>>> >> >>>         implements CheckpointedFunction {
>>> >> >>> ...
>>> >> >>>
>>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].
>>> >> >>>
>>> >> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>>> >> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >> >>>
>>> >> >>>
>>> >> >>> --
>>> >> >>> -- Felipe Gutierrez
>>> >> >>> -- skype: felipe.o.gutierrez
>>> >> >>>
>>> >> >>>
>>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
>>> >> >>>>
>>> >> >>>> You can also use accumulators [1] to collect the number of restarts
>>> >> >>>> (and then access it via client); but side outputs should work as well.
>>> >> >>>>
>>> >> >>>> [1]
>>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>> >> >>>>
>>> >> >>>> Regards,
>>> >> >>>> Roman
>>> >> >>>>
>>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>> >> >>>> <[hidden email]> wrote:
>>> >> >>>> >
>>> >> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>>> >> >>>> >
>>> >> >>>> >
>>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>>> >> >>>> >>
>>> >> >>>> >> Cool!
>>> >> >>>> >>
>>> >> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>>> >> >>>> >>
>>> >> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>>> >> >>>> >>
>>> >> >>>> >> Thanks
>>> >> >>>> >> Felipe
>>> >> >>>> >>
>>> >> >>>> >> --
>>> >> >>>> >> -- Felipe Gutierrez
>>> >> >>>> >> -- skype: felipe.o.gutierrez
>>> >> >>>> >>
>>> >> >>>> >>
>>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>>> >> >>>> >>>
>>> >> >>>> >>> Hi Felipe,
>>> >> >>>> >>>
>>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> >> >>>> >>> that depending on the configuration only a pipeline region can be
>>> >> >>>> >>> restarted, not the whole job).
>>> >> >>>> >>>
>>> >> >>>> >>> But if all you want is to check whether it's a first attempt or not,
>>> >> >>>> >>> you can also call context.isRestored() from initializeState() [2]
>>> >> >>>> >>>
>>> >> >>>> >>> [1]
>>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>> >> >>>> >>>
>>> >> >>>> >>> [2]
>>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >> >>>> >>>
>>> >> >>>> >>> Regards,
>>> >> >>>> >>> Roman
>>> >> >>>> >>>
>>> >> >>>> >>>
>>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> >> >>>> >>> <[hidden email]> wrote:
>>> >> >>>> >>> >
>>> >> >>>> >>> > Hello community,
>>> >> >>>> >>> >
>>> >> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>>> >> >>>> >>> >
>>> >> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>>> >> >>>> >>> >
>>> >> >>>> >>> > Thanks,
>>> >> >>>> >>> > Felipe
>>> >> >>>> >>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to know (in code) how many times the job restarted?

Roman Khachatryan
> do you mean inside the processElement() method?
I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess.

> what is 0ms pause? do you mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);

> How do you create a random exception? do you mean not mine SimulatedException?
I mean it should be thrown at random because the checkpoint must
reliably precede it. So on recovery that there is some state. Checking
against 6666... only once assumes that the checkpoint was triggered
before. Besides, checkpoint is not guaranteed to be triggered before
the end of input.
I tried to run it in with maven and it worked after making the source infinite.

From the code you provided the parallelism level doesn't seem
important and can be set 1 (or restart strategy to full). Then using
getRuntimeContext().getAttemptNumber() would be simpler and more
reliable.

Regards,
Roman

On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez
<[hidden email]> wrote:

>
>
>
> On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> I tried to run the test that you mentioned
>> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
>> 6f08d0a.
>>
>> In IDE, I see that:
>> - checkpoint is never triggered (sentence is too short, checkpoint
>> pause and interval are too large)
>> - exception is never thrown, so the job never restarted
>> (currentTimeMillis is incremented but referenceTimeMillisAhead is not)
>>
>> When I add sleep between each element, set 10ms interval, 0ms pause
>> and introduce some random exception, I do see
>
>
> do you mean inside the processElement() method?
>
> what is 0ms pause? do you mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
>
> How do you create a random exception? do you mean not mine SimulatedException?
>
> Using these configurations that I just said it is not working for me. I am testing on the terminal "mvn -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder" when I call "env.execute();"
>
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
> at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
> at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
> at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
> at org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder
>
>
>
>>
>> 2021-06-18 17:37:53,241 INFO
>> org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess  -
>> Attempts restart: 1
>> in the logs.
>>
>> These settings probably differ on the cluster and there is some
>> unrelated exception which causes a restart.
>>
>> Regards,
>> Roman
>>
>> On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez
>> <[hidden email]> wrote:
>> >
>> > I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods.
>> >
>> > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java
>> >
>> > Here is my integration test: https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154
>> >
>> > If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster.
>> >
>> > I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests?
>> >
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> >
>> >
>> > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <[hidden email]> wrote:
>> >>
>> >> No, it didn't work.
>> >>
>> >> The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
>> >> I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure.
>> >>
>> >>     @Override
>> >>     public void initializeState(FunctionInitializationContext context) throws Exception {
>> >>         // unit tests does not open OperatorStateStore
>> >>         if (context.getOperatorStateStore() != null) {
>> >>             restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>> >>
>> >>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>> >>             if (restoreList == null || restoreList.isEmpty()) {
>> >>                 restartsState.add(0L);
>> >>                 LOG.info("restarts: 0");
>> >>             } else {
>> >>                 Long max = Collections.max(restoreList);
>> >>                 LOG.info("restarts: " + max);
>> >>                 restartsState.add(max + 1);
>> >>             }
>> >>         }
>> >>     }
>> >>
>> >> --
>> >> -- Felipe Gutierrez
>> >> -- skype: felipe.o.gutierrez
>> >>
>> >>
>> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <[hidden email]> wrote:
>> >>>
>> >>> Thanks for sharing,
>> >>>
>> >>> I think the problem is that restartsState is never updated:
>> >>> - on the first attempt, context.isRestored() returns false (and "never
>> >>> restored" is logged)
>> >>> - on subsequent attempts, it again returns false, because the state
>> >>> was never updated before
>> >>>
>> >>> Adding
>> >>> if (!context.isRestored()) { restartsState.add(0L); }
>> >>> should solve the problem
>> >>> (it's also better to use state.update instead of state.add if only max
>> >>> is needed).
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
>> >>> <[hidden email]> wrote:
>> >>> >
>> >>> > Sure, here it is. Nothing is mocked. I double-checked.
>> >>> >
>> >>> > UnitTestClass {.....
>> >>> > protected static LocalFlinkMiniCluster flink;
>> >>> >
>> >>> > @BeforeClass
>> >>> > public static void prepare() {
>> >>> >     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>> >>> >     flink.start();
>> >>> >
>> >>> >     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
>> >>> > }
>> >>> >
>> >>> > private static Configuration getFlinkConfiguration() {
>> >>> >     Configuration flinkConfig = new Configuration();
>> >>> >     flinkConfig.setInteger("local.number-taskmanager", 1);
>> >>> >     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>> >>> >     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>> >>> >     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>> >>> >     try {
>> >>> >         flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath());
>> >>> >     } catch (IOException e) {
>> >>> >         throw new RuntimeException("error in flink cluster config", e);
>> >>> >     }
>> >>> >     return flinkConfig;
>> >>> > }
>> >>> >
>> >>> >
>> >>> > The class that I check if the job was restarted:
>> >>> >
>> >>> > public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...>
>> >>> >         implements CheckpointedFunction {
>> >>> >
>> >>> >     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
>> >>> >     };
>> >>> >     private transient ListState<Long> restartsState;
>> >>> >     private Long restartsLocal;
>> >>> >     ...
>> >>> >     @Override
>> >>> >     public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception {
>> >>> >         this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind;
>> >>> >
>> >>> >         // If current time is less than the reference time ahead AND we have the poison auction an exception will throw
>> >>> >         if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) {
>> >>> >
>> >>> >             LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]",
>> >>> >                     sdfMillis.format(new Date(this.currentTimeMillis)),
>> >>> >                     sdfMillis.format(new Date(this.referenceTimeMillisAhead)));
>> >>> >
>> >>> >             throw new SimulatedException("Transaction ID: " + value.toString() +
>> >>> >                     " not allowed. This is a simple exception for testing purposes.");
>> >>> >         }
>> >>> >         out.collect(value);
>> >>> >
>> >>> >
>> >>> >         // counts the restarts
>> >>> >         if (restartsState != null) {
>> >>> >             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>> >>> >             Long attemptsRestart = 0L;
>> >>> >             if (restoreList != null && !restoreList.isEmpty()) {
>> >>> >                 attemptsRestart = Collections.max(restoreList);
>> >>> >                 if (restartsLocal < attemptsRestart) {
>> >>> >                     restartsLocal = attemptsRestart;
>> >>> >                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
>> >>> >                 }
>> >>> >             }
>> >>> >             LOG.info("Attempts restart: " + attemptsRestart);
>> >>> >         }
>> >>> >     }
>> >>> >
>> >>> >     @Override
>> >>> >     public void snapshotState(FunctionSnapshotContext context) throws Exception {}
>> >>> >
>> >>> >     @Override
>> >>> >     public void initializeState(FunctionInitializationContext context) throws Exception {
>> >>> >         restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
>> >>> >
>> >>> >         if (context.isRestored()) {
>> >>> >             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>> >>> >             if (restoreList == null || restoreList.isEmpty()) {
>> >>> >                 restartsState.add(1L);
>> >>> >                 LOG.info("restarts: 1");
>> >>> >             } else {
>> >>> >                 Long max = Collections.max(restoreList);
>> >>> >                 LOG.info("restarts: " + max);
>> >>> >                 restartsState.add(max + 1);
>> >>> >             }
>> >>> >         } else {
>> >>> >             LOG.info("restarts: never restored");
>> >>> >         }
>> >>> >     }
>> >>> > }
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <[hidden email]> wrote:
>> >>> >>
>> >>> >> Hi,
>> >>> >>
>> >>> >> Could you please share the test code?
>> >>> >>
>> >>> >> I think the returned value might depend on the level on which the
>> >>> >> tests are executed. If it's a regular job then it should return the
>> >>> >> correct value (as with cluster). If the environment in which the code
>> >>> >> is executed is mocked then it can be false.
>> >>> >>
>> >>> >> Regards,
>> >>> >> Roman
>> >>> >>
>> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>> >>> >> <[hidden email]> wrote:
>> >>> >> >
>> >>> >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
>> >>> >> >
>> >>> >> > --
>> >>> >> > -- Felipe Gutierrez
>> >>> >> > -- skype: felipe.o.gutierrez
>> >>> >> >
>> >>> >> >
>> >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[hidden email]> wrote:
>> >>> >> >>
>> >>> >> >> Does your ProcessFunction has state? If not it would be in line with the documentation.
>> >>> >> >>
>> >>> >> >> Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state.
>> >>> >> >>
>> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <[hidden email]> wrote:
>> >>> >> >>>
>> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
>> >>> >> >>>
>> >>> >> >>> "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks."
>> >>> >> >>>
>> >>> >> >>> It is weird because I am extending a ProcessFunction which is a RichFunction.
>> >>> >> >>>
>> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData>
>> >>> >> >>>         implements CheckpointedFunction {
>> >>> >> >>> ...
>> >>> >> >>>
>> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1].
>> >>> >> >>>
>> >>> >> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> >>> >> >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>> >> >>>
>> >>> >> >>>
>> >>> >> >>> --
>> >>> >> >>> -- Felipe Gutierrez
>> >>> >> >>> -- skype: felipe.o.gutierrez
>> >>> >> >>>
>> >>> >> >>>
>> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[hidden email]> wrote:
>> >>> >> >>>>
>> >>> >> >>>> You can also use accumulators [1] to collect the number of restarts
>> >>> >> >>>> (and then access it via client); but side outputs should work as well.
>> >>> >> >>>>
>> >>> >> >>>> [1]
>> >>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>> >>> >> >>>>
>> >>> >> >>>> Regards,
>> >>> >> >>>> Roman
>> >>> >> >>>>
>> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>> >>> >> >>>> <[hidden email]> wrote:
>> >>> >> >>>> >
>> >>> >> >>>> > I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
>> >>> >> >>>> >
>> >>> >> >>>> >
>> >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <[hidden email]> wrote:
>> >>> >> >>>> >>
>> >>> >> >>>> >> Cool!
>> >>> >> >>>> >>
>> >>> >> >>>> >> I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately).
>> >>> >> >>>> >>
>> >>> >> >>>> >> Because I need to test it in an integration test I am using a side output (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved....
>> >>> >> >>>> >>
>> >>> >> >>>> >> Thanks
>> >>> >> >>>> >> Felipe
>> >>> >> >>>> >>
>> >>> >> >>>> >> --
>> >>> >> >>>> >> -- Felipe Gutierrez
>> >>> >> >>>> >> -- skype: felipe.o.gutierrez
>> >>> >> >>>> >>
>> >>> >> >>>> >>
>> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[hidden email]> wrote:
>> >>> >> >>>> >>>
>> >>> >> >>>> >>> Hi Felipe,
>> >>> >> >>>> >>>
>> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> >>> >> >>>> >>> that depending on the configuration only a pipeline region can be
>> >>> >> >>>> >>> restarted, not the whole job).
>> >>> >> >>>> >>>
>> >>> >> >>>> >>> But if all you want is to check whether it's a first attempt or not,
>> >>> >> >>>> >>> you can also call context.isRestored() from initializeState() [2]
>> >>> >> >>>> >>>
>> >>> >> >>>> >>> [1]
>> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >>> >> >>>> >>>
>> >>> >> >>>> >>> [2]
>> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>> >> >>>> >>>
>> >>> >> >>>> >>> Regards,
>> >>> >> >>>> >>> Roman
>> >>> >> >>>> >>>
>> >>> >> >>>> >>>
>> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >>> >> >>>> >>> <[hidden email]> wrote:
>> >>> >> >>>> >>> >
>> >>> >> >>>> >>> > Hello community,
>> >>> >> >>>> >>> >
>> >>> >> >>>> >>> > Is it possible to know programmatically how many times my Flink stream job restarted since it was running?
>> >>> >> >>>> >>> >
>> >>> >> >>>> >>> > My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted.
>> >>> >> >>>> >>> >
>> >>> >> >>>> >>> > Thanks,
>> >>> >> >>>> >>> > Felipe
>> >>> >> >>>> >>> >