Checkpoint

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint

vijikarthi
Hello,

I am writing a small application to understand how checkpoint and recovery works in Flink.

Here is my setup.

Flink 3 node cluster (1 JM, 2 TM) built from latest github codebase
HDP 2.4 deployed with just HDFS for checkpoint and sink
Kafka 0.9x

The sample code pulls Kafka topic (1 partition) and do some transformation, sinks it to HDFS (RollingSink). For now, I am creating sink file for every 1KB size and checkpoint is trigerred every second.

The message pumped in to Kafka are just sequence numbers 1, 2, 3 .... N

Below are my observations from the setup.

1) Flink uses checkpoint location from HDFS to maintain the checkpoint information
2) Sink is working properly under normal run (read no TM failures)

Questions:
1) How do I find the Kafka topic/partition offset details that Flink mainatins in checkpoint (readable format)

2) When I manually simulate TM failures, I sometime see data duplicate data. I was expecting exactly once mechanism to work but found some duplicates. How do I 
validate exactly once is working fine or not?

3) How can I simulate and verify backpressure? I have introduced some delay (Thread Sleep) in the job before the sink but the "backpressure" tab from UI does not show any indication of whether backpressure is working or not.

Appreciate your thoughts.

Regards
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint

Ufuk Celebi
Hey Vijay!

On Mon, Mar 7, 2016 at 8:42 PM, Vijay Srinivasaraghavan
<[hidden email]> wrote:
> 3) How can I simulate and verify backpressure? I have introduced some delay
> (Thread Sleep) in the job before the sink but the "backpressure" tab from UI
> does not show any indication of whether backpressure is working or not.

If a task is slow, it is back pressuring upstream tasks, e.g. if your
transformations have the sleep, the sources should be back pressured.
It can happen that even with the sleep the tasks still produce their
data as fast as they can and hence no back pressure is indicated in
the web interface. You can increase the sleep to check this.

The mechanism used to determine back pressure is based on sampling the
stack traces of running tasks. You can increase the number of samples
and/or decrease the delay between samples via config parameters shown
in [1]. It can happen that the samples miss the back pressure
indicators, but usually the defaults work fine.

[1] https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-web-frontend
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint

vijikarthi
Hi Ufuk,

I have increased the sampling size to 1000 and decreased the refresh interval by half. In my Kafka topic I have pumped million messages which is read by KafkaConsumer pipeline and then pass it to a transofmation step where I have introduced sleep (3 sec) for every single message received and the final step is HDFS sink using RollingSinc API.

jobmanager.web.backpressure.num-samples: 1000
jobmanager.web.backpressure.refresh-interval: 30000


I was hoping to see the backpressure tab from UI to display some warning but I still see "OK" message.

This makes me wonder if I am testing the backpressure scenario properly or not? 

Regards
Vijay

On Monday, March 7, 2016 3:19 PM, Ufuk Celebi <[hidden email]> wrote:


Hey Vijay!

On Mon, Mar 7, 2016 at 8:42 PM, Vijay Srinivasaraghavan
<[hidden email]> wrote:
> 3) How can I simulate and verify backpressure? I have introduced some delay
> (Thread Sleep) in the job before the sink but the "backpressure" tab from UI
> does not show any indication of whether backpressure is working or not.

If a task is slow, it is back pressuring upstream tasks, e.g. if your
transformations have the sleep, the sources should be back pressured.
It can happen that even with the sleep the tasks still produce their
data as fast as they can and hence no back pressure is indicated in
the web interface. You can increase the sleep to check this.

The mechanism used to determine back pressure is based on sampling the
stack traces of running tasks. You can increase the number of samples
and/or decrease the delay between samples via config parameters shown
in [1]. It can happen that the samples miss the back pressure
indicators, but usually the defaults work fine.


[1]
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-web-frontend



Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint

Ufuk Celebi
How many vertices does the web interface show and what parallelism are
you running? If the sleeping operator is chained you will not see
anything.

If your goal is to just see some back pressure warning, you can call
env.disableOperatorChaining() and re-run the program. Does this work?

– Ufuk


On Thu, Mar 10, 2016 at 1:35 AM, Vijay Srinivasaraghavan
<[hidden email]> wrote:

> Hi Ufuk,
>
> I have increased the sampling size to 1000 and decreased the refresh
> interval by half. In my Kafka topic I have pumped million messages which is
> read by KafkaConsumer pipeline and then pass it to a transofmation step
> where I have introduced sleep (3 sec) for every single message received and
> the final step is HDFS sink using RollingSinc API.
>
> jobmanager.web.backpressure.num-samples: 1000
> jobmanager.web.backpressure.refresh-interval: 30000
>
>
> I was hoping to see the backpressure tab from UI to display some warning but
> I still see "OK" message.
>
> This makes me wonder if I am testing the backpressure scenario properly or
> not?
>
> Regards
> Vijay
>
> On Monday, March 7, 2016 3:19 PM, Ufuk Celebi <[hidden email]> wrote:
>
>
> Hey Vijay!
>
> On Mon, Mar 7, 2016 at 8:42 PM, Vijay Srinivasaraghavan
> <[hidden email]> wrote:
>> 3) How can I simulate and verify backpressure? I have introduced some
>> delay
>> (Thread Sleep) in the job before the sink but the "backpressure" tab from
>> UI
>> does not show any indication of whether backpressure is working or not.
>
> If a task is slow, it is back pressuring upstream tasks, e.g. if your
> transformations have the sleep, the sources should be back pressured.
> It can happen that even with the sleep the tasks still produce their
> data as fast as they can and hence no back pressure is indicated in
> the web interface. You can increase the sleep to check this.
>
> The mechanism used to determine back pressure is based on sampling the
> stack traces of running tasks. You can increase the number of samples
> and/or decrease the delay between samples via config parameters shown
> in [1]. It can happen that the samples miss the back pressure
> indicators, but usually the defaults work fine.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-web-frontend
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint

rmetzger0
Hi Vijay,

regarding your other questions:

1) On the TaskManagers, the FlinkKafkaConsumers will write the partitions they are going to read in the log. There is currently no way of seeing the state of a checkpoint in Flink (which is the offsets).
However, once a checkpoint is completed, the Kafka consumer is committing the offset to the Kafka broker. (I could not find tool to get the committed offsets from the broker, but its either stored in ZK or in a special topic by the broker. In Kafka 0.8 that's easily doable with the kafka.tools.ConsumerOffsetChecker)

2) Do you see duplicate data written by the rolling file sink? Or do you see it somewhere else?
HDP 2.4 is using Hadoop 2.7.1 so the truncate() of invalid data should actually work properly.




On Thu, Mar 10, 2016 at 10:44 AM, Ufuk Celebi <[hidden email]> wrote:
How many vertices does the web interface show and what parallelism are
you running? If the sleeping operator is chained you will not see
anything.

If your goal is to just see some back pressure warning, you can call
env.disableOperatorChaining() and re-run the program. Does this work?

– Ufuk


On Thu, Mar 10, 2016 at 1:35 AM, Vijay Srinivasaraghavan
<[hidden email]> wrote:
> Hi Ufuk,
>
> I have increased the sampling size to 1000 and decreased the refresh
> interval by half. In my Kafka topic I have pumped million messages which is
> read by KafkaConsumer pipeline and then pass it to a transofmation step
> where I have introduced sleep (3 sec) for every single message received and
> the final step is HDFS sink using RollingSinc API.
>
> jobmanager.web.backpressure.num-samples: 1000
> jobmanager.web.backpressure.refresh-interval: 30000
>
>
> I was hoping to see the backpressure tab from UI to display some warning but
> I still see "OK" message.
>
> This makes me wonder if I am testing the backpressure scenario properly or
> not?
>
> Regards
> Vijay
>
> On Monday, March 7, 2016 3:19 PM, Ufuk Celebi <[hidden email]> wrote:
>
>
> Hey Vijay!
>
> On Mon, Mar 7, 2016 at 8:42 PM, Vijay Srinivasaraghavan
> <[hidden email]> wrote:
>> 3) How can I simulate and verify backpressure? I have introduced some
>> delay
>> (Thread Sleep) in the job before the sink but the "backpressure" tab from
>> UI
>> does not show any indication of whether backpressure is working or not.
>
> If a task is slow, it is back pressuring upstream tasks, e.g. if your
> transformations have the sleep, the sources should be back pressured.
> It can happen that even with the sleep the tasks still produce their
> data as fast as they can and hence no back pressure is indicated in
> the web interface. You can increase the sleep to check this.
>
> The mechanism used to determine back pressure is based on sampling the
> stack traces of running tasks. You can increase the number of samples
> and/or decrease the delay between samples via config parameters shown
> in [1]. It can happen that the samples miss the back pressure
> indicators, but usually the defaults work fine.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-web-frontend
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint

Stephan Ewen
Just to be sure: Is the task whose backpressure you want to monitor the Kafka Source?

There is an open issue that backpressure monitoring does not work for the Kafka Source: https://issues.apache.org/jira/browse/FLINK-3456

To circumvent that, add an "IdentityMapper" after the Kafka source, make sure it is non-chained, and monitor the backpressure on that MapFunction.

Greetings,
Stephan


On Thu, Mar 10, 2016 at 11:23 AM, Robert Metzger <[hidden email]> wrote:
Hi Vijay,

regarding your other questions:

1) On the TaskManagers, the FlinkKafkaConsumers will write the partitions they are going to read in the log. There is currently no way of seeing the state of a checkpoint in Flink (which is the offsets).
However, once a checkpoint is completed, the Kafka consumer is committing the offset to the Kafka broker. (I could not find tool to get the committed offsets from the broker, but its either stored in ZK or in a special topic by the broker. In Kafka 0.8 that's easily doable with the kafka.tools.ConsumerOffsetChecker)

2) Do you see duplicate data written by the rolling file sink? Or do you see it somewhere else?
HDP 2.4 is using Hadoop 2.7.1 so the truncate() of invalid data should actually work properly.





On Thu, Mar 10, 2016 at 10:44 AM, Ufuk Celebi <[hidden email]> wrote:
How many vertices does the web interface show and what parallelism are
you running? If the sleeping operator is chained you will not see
anything.

If your goal is to just see some back pressure warning, you can call
env.disableOperatorChaining() and re-run the program. Does this work?

– Ufuk


On Thu, Mar 10, 2016 at 1:35 AM, Vijay Srinivasaraghavan
<[hidden email]> wrote:
> Hi Ufuk,
>
> I have increased the sampling size to 1000 and decreased the refresh
> interval by half. In my Kafka topic I have pumped million messages which is
> read by KafkaConsumer pipeline and then pass it to a transofmation step
> where I have introduced sleep (3 sec) for every single message received and
> the final step is HDFS sink using RollingSinc API.
>
> jobmanager.web.backpressure.num-samples: 1000
> jobmanager.web.backpressure.refresh-interval: 30000
>
>
> I was hoping to see the backpressure tab from UI to display some warning but
> I still see "OK" message.
>
> This makes me wonder if I am testing the backpressure scenario properly or
> not?
>
> Regards
> Vijay
>
> On Monday, March 7, 2016 3:19 PM, Ufuk Celebi <[hidden email]> wrote:
>
>
> Hey Vijay!
>
> On Mon, Mar 7, 2016 at 8:42 PM, Vijay Srinivasaraghavan
> <[hidden email]> wrote:
>> 3) How can I simulate and verify backpressure? I have introduced some
>> delay
>> (Thread Sleep) in the job before the sink but the "backpressure" tab from
>> UI
>> does not show any indication of whether backpressure is working or not.
>
> If a task is slow, it is back pressuring upstream tasks, e.g. if your
> transformations have the sleep, the sources should be back pressured.
> It can happen that even with the sleep the tasks still produce their
> data as fast as they can and hence no back pressure is indicated in
> the web interface. You can increase the sleep to check this.
>
> The mechanism used to determine back pressure is based on sampling the
> stack traces of running tasks. You can increase the number of samples
> and/or decrease the delay between samples via config parameters shown
> in [1]. It can happen that the samples miss the back pressure
> indicators, but usually the defaults work fine.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-web-frontend
>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint

vijikarthi
Thanks Ufuk and Stephan.

I have added Identity mapper and disabled chaining. With that, I am able to see the backpressue alert on the identify mapper task.

I have noticed one thing that when I introduced delay (sleep) on the subsequent task, sometimes checkpoint is not working. I could see checkpoint trigger but the files are not moved from "pending" state. I will try to reproduce to find the pattern but are you aware of any such scenario?

Regards
Vijay

On Thursday, March 10, 2016 2:51 AM, Stephan Ewen <[hidden email]> wrote:


Just to be sure: Is the task whose backpressure you want to monitor the Kafka Source?

There is an open issue that backpressure monitoring does not work for the Kafka Source: https://issues.apache.org/jira/browse/FLINK-3456

To circumvent that, add an "IdentityMapper" after the Kafka source, make sure it is non-chained, and monitor the backpressure on that MapFunction.

Greetings,
Stephan


On Thu, Mar 10, 2016 at 11:23 AM, Robert Metzger <[hidden email]> wrote:
Hi Vijay,

regarding your other questions:

1) On the TaskManagers, the FlinkKafkaConsumers will write the partitions they are going to read in the log. There is currently no way of seeing the state of a checkpoint in Flink (which is the offsets).
However, once a checkpoint is completed, the Kafka consumer is committing the offset to the Kafka broker. (I could not find tool to get the committed offsets from the broker, but its either stored in ZK or in a special topic by the broker. In Kafka 0.8 that's easily doable with the kafka.tools.ConsumerOffsetChecker)

2) Do you see duplicate data written by the rolling file sink? Or do you see it somewhere else?
HDP 2.4 is using Hadoop 2.7.1 so the truncate() of invalid data should actually work properly.





On Thu, Mar 10, 2016 at 10:44 AM, Ufuk Celebi <[hidden email]> wrote:
How many vertices does the web interface show and what parallelism are
you running? If the sleeping operator is chained you will not see
anything.

If your goal is to just see some back pressure warning, you can call
env.disableOperatorChaining() and re-run the program. Does this work?

– Ufuk


On Thu, Mar 10, 2016 at 1:35 AM, Vijay Srinivasaraghavan
<[hidden email]> wrote:
> Hi Ufuk,
>
> I have increased the sampling size to 1000 and decreased the refresh
> interval by half. In my Kafka topic I have pumped million messages which is
> read by KafkaConsumer pipeline and then pass it to a transofmation step
> where I have introduced sleep (3 sec) for every single message received and
> the final step is HDFS sink using RollingSinc API.
>
> jobmanager.web.backpressure.num-samples: 1000
> jobmanager.web.backpressure.refresh-interval: 30000
>
>
> I was hoping to see the backpressure tab from UI to display some warning but
> I still see "OK" message.
>
> This makes me wonder if I am testing the backpressure scenario properly or
> not?
>
> Regards
> Vijay
>
> On Monday, March 7, 2016 3:19 PM, Ufuk Celebi <[hidden email]> wrote:
>
>
> Hey Vijay!
>
> On Mon, Mar 7, 2016 at 8:42 PM, Vijay Srinivasaraghavan
> <[hidden email]> wrote:
>> 3) How can I simulate and verify backpressure? I have introduced some
>> delay
>> (Thread Sleep) in the job before the sink but the "backpressure" tab from
>> UI
>> does not show any indication of whether backpressure is working or not.
>
> If a task is slow, it is back pressuring upstream tasks, e.g. if your
> transformations have the sleep, the sources should be back pressured.
> It can happen that even with the sleep the tasks still produce their
> data as fast as they can and hence no back pressure is indicated in
> the web interface. You can increase the sleep to check this.
>
> The mechanism used to determine back pressure is based on sampling the
> stack traces of running tasks. You can increase the number of samples
> and/or decrease the delay between samples via config parameters shown
> in [1]. It can happen that the samples miss the back pressure
> indicators, but usually the defaults work fine.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-web-frontend
>
>
>




Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint

Aljoscha Krettek
Hi,
I’m not aware of a problem where pending files are not moved to their final locations. So if you have such a behavior it would indicate a bug. Also, the "trigger checkpoint” does not yet indicate that the checkpoint is happening. If you have a very long sleep interval in some of your operations still will also block checkpointing from happening. A checkpoint at an operation can only be performed when control is currently not inside a user function. We do this to ensure consistency.

If you’d like to help you can also increase the log level to DEBUG, then the RollingSink will print very detailed information about what it does, for example moving files to/from “pending. If you can reproduce the problem with DEBUG logs this would help me finding if there is a problem with Flink.

Regards,
Aljoscha

> On 10 Mar 2016, at 19:44, Vijay Srinivasaraghavan <[hidden email]> wrote:
>
> Thanks Ufuk and Stephan.
>
> I have added Identity mapper and disabled chaining. With that, I am able to see the backpressue alert on the identify mapper task.
>
> I have noticed one thing that when I introduced delay (sleep) on the subsequent task, sometimes checkpoint is not working. I could see checkpoint trigger but the files are not moved from "pending" state. I will try to reproduce to find the pattern but are you aware of any such scenario?
>
> Regards
> Vijay
>
> On Thursday, March 10, 2016 2:51 AM, Stephan Ewen <[hidden email]> wrote:
>
>
> Just to be sure: Is the task whose backpressure you want to monitor the Kafka Source?
>
> There is an open issue that backpressure monitoring does not work for the Kafka Source: https://issues.apache.org/jira/browse/FLINK-3456
>
> To circumvent that, add an "IdentityMapper" after the Kafka source, make sure it is non-chained, and monitor the backpressure on that MapFunction.
>
> Greetings,
> Stephan
>
>
> On Thu, Mar 10, 2016 at 11:23 AM, Robert Metzger <[hidden email]> wrote:
> Hi Vijay,
>
> regarding your other questions:
>
> 1) On the TaskManagers, the FlinkKafkaConsumers will write the partitions they are going to read in the log. There is currently no way of seeing the state of a checkpoint in Flink (which is the offsets).
> However, once a checkpoint is completed, the Kafka consumer is committing the offset to the Kafka broker. (I could not find tool to get the committed offsets from the broker, but its either stored in ZK or in a special topic by the broker. In Kafka 0.8 that's easily doable with the kafka.tools.ConsumerOffsetChecker)
>
> 2) Do you see duplicate data written by the rolling file sink? Or do you see it somewhere else?
> HDP 2.4 is using Hadoop 2.7.1 so the truncate() of invalid data should actually work properly.
>
>
>
>
>
> On Thu, Mar 10, 2016 at 10:44 AM, Ufuk Celebi <[hidden email]> wrote:
> How many vertices does the web interface show and what parallelism are
> you running? If the sleeping operator is chained you will not see
> anything.
>
> If your goal is to just see some back pressure warning, you can call
> env.disableOperatorChaining() and re-run the program. Does this work?
>
> – Ufuk
>
>
> On Thu, Mar 10, 2016 at 1:35 AM, Vijay Srinivasaraghavan
> <[hidden email]> wrote:
> > Hi Ufuk,
> >
> > I have increased the sampling size to 1000 and decreased the refresh
> > interval by half. In my Kafka topic I have pumped million messages which is
> > read by KafkaConsumer pipeline and then pass it to a transofmation step
> > where I have introduced sleep (3 sec) for every single message received and
> > the final step is HDFS sink using RollingSinc API.
> >
> > jobmanager.web.backpressure.num-samples: 1000
> > jobmanager.web.backpressure.refresh-interval: 30000
> >
> >
> > I was hoping to see the backpressure tab from UI to display some warning but
> > I still see "OK" message.
> >
> > This makes me wonder if I am testing the backpressure scenario properly or
> > not?
> >
> > Regards
> > Vijay
> >
> > On Monday, March 7, 2016 3:19 PM, Ufuk Celebi <[hidden email]> wrote:
> >
> >
> > Hey Vijay!
> >
> > On Mon, Mar 7, 2016 at 8:42 PM, Vijay Srinivasaraghavan
> > <[hidden email]> wrote:
> >> 3) How can I simulate and verify backpressure? I have introduced some
> >> delay
> >> (Thread Sleep) in the job before the sink but the "backpressure" tab from
> >> UI
> >> does not show any indication of whether backpressure is working or not.
> >
> > If a task is slow, it is back pressuring upstream tasks, e.g. if your
> > transformations have the sleep, the sources should be back pressured.
> > It can happen that even with the sleep the tasks still produce their
> > data as fast as they can and hence no back pressure is indicated in
> > the web interface. You can increase the sleep to check this.
> >
> > The mechanism used to determine back pressure is based on sampling the
> > stack traces of running tasks. You can increase the number of samples
> > and/or decrease the delay between samples via config parameters shown
> > in [1]. It can happen that the samples miss the back pressure
> > indicators, but usually the defaults work fine.
> >
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-web-frontend
> >
> >
> >
>
>
>
>