Behavior of the cancel command

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Behavior of the cancel command

Jürgen Thomann
Hi,

I had some time ago problems with writing data to Hadoop with the
BucketingSink and losing data in case of cancel with savepoint because
flush/sync command was interrupted. I tried changing Hadoop settings as
suggested but had no luck at the end and looked into the Flink code. If
I understand the code correctly it behaves the following way:

1. Start a Watchdog thread if we have a cancellation timeout set
2. invoke cancel on the sink/task, but do not wait for it to finish
3. destroy buffer pool and a release resources
4. send initial interrupt to the sink/task
5. call join on the sink/task and ignore InterruptedException
6. let the watchdog send more interrupts if needed and throw fatal error
if timeout is reached

In my case the BucketingSink does not has enough time to flush
everything before the initial interrupt is sent and some files are not
closed properly which causes the missing data in Hadoop in my understanding.

Is my understanding correct and if yes, do you know a way to get around
this behavior to let the close function finish the sync for all files?

Best,
Jürgen
Reply | Threaded
Open this post in threaded view
|

Re: Behavior of the cancel command

Aljoscha Krettek
Hi Jürgen,
Is there missing data with respect to what should have been written at the time of the cancel or when the last checkpoint (or in that case, the savepoint) was performed. I’m asking because the cancel command is only sent out once the savepoint has been completed, as can be seen at [1]. If the savepoint is complete this also means that the snapshot method of the BucketingSink must have done it’s work, i.e. that it also flushed all files, which is done in [2]. There’s always the possibility of a bug, however, so we’ll have to look into this together.

Best,
Aljoscha



On 27. Apr 2017, at 10:27, Jürgen Thomann <[hidden email]> wrote:

Hi,

I had some time ago problems with writing data to Hadoop with the BucketingSink and losing data in case of cancel with savepoint because flush/sync command was interrupted. I tried changing Hadoop settings as suggested but had no luck at the end and looked into the Flink code. If I understand the code correctly it behaves the following way:

1. Start a Watchdog thread if we have a cancellation timeout set
2. invoke cancel on the sink/task, but do not wait for it to finish
3. destroy buffer pool and a release resources
4. send initial interrupt to the sink/task
5. call join on the sink/task and ignore InterruptedException
6. let the watchdog send more interrupts if needed and throw fatal error if timeout is reached

In my case the BucketingSink does not has enough time to flush everything before the initial interrupt is sent and some files are not closed properly which causes the missing data in Hadoop in my understanding.

Is my understanding correct and if yes, do you know a way to get around this behavior to let the close function finish the sync for all files?

Best,
Jürgen

Reply | Threaded
Open this post in threaded view
|

Re: Behavior of the cancel command

Jürgen Thomann
Hi Aljoscha,

In my case the valid-length file created contains a value which e.g. says 100 MB are valid to read for exactly once but the file with the data is only 95 MB large. As I understand it the valid-length file contains the length of the file at the checkpoint. This does also not happen for all files (3 HDFS sinks each with a parallelism of 2). For some parts the file size and the value in the valid-length file match exactly.

After looking now over the checkpoint code in BucketingSink I looked into the hsync behavior again and found the following page: http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file
After this I downloaded the file with the hdfs dfs tool and actually the file is now even larger than the valid-length file. I checked this against the things I did before (Impala and hive select count query, and Hue download of files and wc -l) and this 3 ways result in the same amount of lines but hdfs dfs -cat <filename> | wc -l gives a much larger value.

So my conclusion would be that the data is written and not exactly lost as I thought, but for my use case not visible because the files are not properly closed during cancel and the namenode is not aware of the flushed data. So I could imagine 2 ways out of this: 1. implement the hsync as stated at the Stack Overflow page or 2. ensure that files are properly closed during cancel.

Best,
Jürgen

On 28.04.2017 17:38, Aljoscha Krettek wrote:
Hi Jürgen,
Is there missing data with respect to what should have been written at the time of the cancel or when the last checkpoint (or in that case, the savepoint) was performed. I’m asking because the cancel command is only sent out once the savepoint has been completed, as can be seen at [1]. If the savepoint is complete this also means that the snapshot method of the BucketingSink must have done it’s work, i.e. that it also flushed all files, which is done in [2]. There’s always the possibility of a bug, however, so we’ll have to look into this together.

Best,
Aljoscha



On 27. Apr 2017, at 10:27, Jürgen Thomann <[hidden email]> wrote:

Hi,

I had some time ago problems with writing data to Hadoop with the BucketingSink and losing data in case of cancel with savepoint because flush/sync command was interrupted. I tried changing Hadoop settings as suggested but had no luck at the end and looked into the Flink code. If I understand the code correctly it behaves the following way:

1. Start a Watchdog thread if we have a cancellation timeout set
2. invoke cancel on the sink/task, but do not wait for it to finish
3. destroy buffer pool and a release resources
4. send initial interrupt to the sink/task
5. call join on the sink/task and ignore InterruptedException
6. let the watchdog send more interrupts if needed and throw fatal error if timeout is reached

In my case the BucketingSink does not has enough time to flush everything before the initial interrupt is sent and some files are not closed properly which causes the missing data in Hadoop in my understanding.

Is my understanding correct and if yes, do you know a way to get around this behavior to let the close function finish the sync for all files?

Best,
Jürgen

Reply | Threaded
Open this post in threaded view
|

Re: Behavior of the cancel command

Aljoscha Krettek
Thanks for looking into this, Jürgen!


On 29. Apr 2017, at 09:15, Jürgen Thomann <[hidden email]> wrote:

Hi Aljoscha,

In my case the valid-length file created contains a value which e.g. says 100 MB are valid to read for exactly once but the file with the data is only 95 MB large. As I understand it the valid-length file contains the length of the file at the checkpoint. This does also not happen for all files (3 HDFS sinks each with a parallelism of 2). For some parts the file size and the value in the valid-length file match exactly.

After looking now over the checkpoint code in BucketingSink I looked into the hsync behavior again and found the following page: http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file
After this I downloaded the file with the hdfs dfs tool and actually the file is now even larger than the valid-length file. I checked this against the things I did before (Impala and hive select count query, and Hue download of files and wc -l) and this 3 ways result in the same amount of lines but hdfs dfs -cat <filename> | wc -l gives a much larger value.

So my conclusion would be that the data is written and not exactly lost as I thought, but for my use case not visible because the files are not properly closed during cancel and the namenode is not aware of the flushed data. So I could imagine 2 ways out of this: 1. implement the hsync as stated at the Stack Overflow page or 2. ensure that files are properly closed during cancel.

Best,
Jürgen

On 28.04.2017 17:38, Aljoscha Krettek wrote:
Hi Jürgen,
Is there missing data with respect to what should have been written at the time of the cancel or when the last checkpoint (or in that case, the savepoint) was performed. I’m asking because the cancel command is only sent out once the savepoint has been completed, as can be seen at [1]. If the savepoint is complete this also means that the snapshot method of the BucketingSink must have done it’s work, i.e. that it also flushed all files, which is done in [2]. There’s always the possibility of a bug, however, so we’ll have to look into this together.

Best,
Aljoscha



On 27. Apr 2017, at 10:27, Jürgen Thomann <[hidden email]> wrote:

Hi,

I had some time ago problems with writing data to Hadoop with the BucketingSink and losing data in case of cancel with savepoint because flush/sync command was interrupted. I tried changing Hadoop settings as suggested but had no luck at the end and looked into the Flink code. If I understand the code correctly it behaves the following way:

1. Start a Watchdog thread if we have a cancellation timeout set
2. invoke cancel on the sink/task, but do not wait for it to finish
3. destroy buffer pool and a release resources
4. send initial interrupt to the sink/task
5. call join on the sink/task and ignore InterruptedException
6. let the watchdog send more interrupts if needed and throw fatal error if timeout is reached

In my case the BucketingSink does not has enough time to flush everything before the initial interrupt is sent and some files are not closed properly which causes the missing data in Hadoop in my understanding.

Is my understanding correct and if yes, do you know a way to get around this behavior to let the close function finish the sync for all files?

Best,
Jürgen