Regarding BucketingSink

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Regarding BucketingSink

Vishal Santoshi
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 

--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).


* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 


* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.


What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.



Am I understanding the process correct and it yes any pointers ?


Regards,


Vishal

Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Vishal Santoshi
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredState is skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 

--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).


* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 


* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.


What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.



Am I understanding the process correct and it yes any pointers ?


Regards,


Vishal


Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Vishal Santoshi
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length


that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredState is skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 

--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).


* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 


* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.


What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.



Am I understanding the process correct and it yes any pointers ?


Regards,


Vishal



Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Vishal Santoshi

-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length

-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending

-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length



This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.




On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length


that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 

--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).


* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 


* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.


What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.



Am I understanding the process correct and it yes any pointers ?


Regards,


Vishal




Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Mu Kong
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:

-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length

-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending

-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length



This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.




On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length


that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 

--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).


* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 


* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.


What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.



Am I understanding the process correct and it yes any pointers ?


Regards,


Vishal





Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Till Rohrmann
Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:

-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length

-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending

-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length



This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.




On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length


that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 

--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).


* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 


* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.


What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.



Am I understanding the process correct and it yes any pointers ?


Regards,


Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Aljoscha Krettek
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha

On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Vishal Santoshi
>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal







Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Aljoscha Krettek
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.

Best,
Aljoscha

On 19. Feb 2018, at 17:01, Vishal Santoshi <[hidden email]> wrote:

>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal








Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Vishal Santoshi
That is fine, till flink assure at-least-once semantics ? 

If the contents of a .pending file, through the turbulence ( restarts etc )  are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.

Best,
Aljoscha


On 19. Feb 2018, at 17:01, Vishal Santoshi <[hidden email]> wrote:

>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal









Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Vishal Santoshi
Sorry, but just wanted to confirm that  the assertion "at-least-once"  delivery  true if there is a dangling pending file ? 

On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <[hidden email]> wrote:
That is fine, till flink assure at-least-once semantics ? 

If the contents of a .pending file, through the turbulence ( restarts etc )  are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.

Best,
Aljoscha


On 19. Feb 2018, at 17:01, Vishal Santoshi <[hidden email]> wrote:

>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal










Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Mu Kong
Hi Aljoscha,

Thanks for confirming that fact that Flink doesn't clean up pending files.
Is that safe to clean(remove) all the pending files after cancel(w/ or w/o savepointing) or failover?
If we do that, will we lose some data?

Thanks!

Best,
Mu




On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <[hidden email]> wrote:
Sorry, but just wanted to confirm that  the assertion "at-least-once"  delivery  true if there is a dangling pending file ? 

On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <[hidden email]> wrote:
That is fine, till flink assure at-least-once semantics ? 

If the contents of a .pending file, through the turbulence ( restarts etc )  are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.

Best,
Aljoscha


On 19. Feb 2018, at 17:01, Vishal Santoshi <[hidden email]> wrote:

>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal











Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Fabian Hueske-2
Hi Vishal, hi Mu,

After the savepoint state has been written, the sink might start new .in-progress files. These files are not part of the savepoint but renamed to .pending in close().
On restore all pending files that are part of the savepoint are moved into final state (and possibly truncated). See handlePendingInProgressFiles() method.
Pending files that are not part of the savepoint (because they were created later between taking the savepoint and shutting the job down) are not touched and remain as .pending files.

These should be the .pending files that you observe. Since they contain data that is not part of the savepoint, it should be save to delete them.
If you keep them, you will have at-least-once output.

Best, Fabian


2018-02-21 5:04 GMT+01:00 Mu Kong <[hidden email]>:
Hi Aljoscha,

Thanks for confirming that fact that Flink doesn't clean up pending files.
Is that safe to clean(remove) all the pending files after cancel(w/ or w/o savepointing) or failover?
If we do that, will we lose some data?

Thanks!

Best,
Mu




On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <[hidden email]> wrote:
Sorry, but just wanted to confirm that  the assertion "at-least-once"  delivery  true if there is a dangling pending file ? 

On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <[hidden email]> wrote:
That is fine, till flink assure at-least-once semantics ? 

If the contents of a .pending file, through the turbulence ( restarts etc )  are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.

Best,
Aljoscha


On 19. Feb 2018, at 17:01, Vishal Santoshi <[hidden email]> wrote:

>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal












Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Vishal Santoshi
Thank you Fabian, 

    What is more important ( and I think you might have addressed it in your post so sorry for being a little obtuse ) is that deleting them does not violate "at-least-once" delivery.  And if that is a definite than we are fine with it, though we will test it further.

Thanks and Regards.



On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal, hi Mu,

After the savepoint state has been written, the sink might start new .in-progress files. These files are not part of the savepoint but renamed to .pending in close().
On restore all pending files that are part of the savepoint are moved into final state (and possibly truncated). See handlePendingInProgressFiles() method.
Pending files that are not part of the savepoint (because they were created later between taking the savepoint and shutting the job down) are not touched and remain as .pending files.

These should be the .pending files that you observe. Since they contain data that is not part of the savepoint, it should be save to delete them.
If you keep them, you will have at-least-once output.

Best, Fabian


2018-02-21 5:04 GMT+01:00 Mu Kong <[hidden email]>:
Hi Aljoscha,

Thanks for confirming that fact that Flink doesn't clean up pending files.
Is that safe to clean(remove) all the pending files after cancel(w/ or w/o savepointing) or failover?
If we do that, will we lose some data?

Thanks!

Best,
Mu




On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <[hidden email]> wrote:
Sorry, but just wanted to confirm that  the assertion "at-least-once"  delivery  true if there is a dangling pending file ? 

On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <[hidden email]> wrote:
That is fine, till flink assure at-least-once semantics ? 

If the contents of a .pending file, through the turbulence ( restarts etc )  are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.

Best,
Aljoscha


On 19. Feb 2018, at 17:01, Vishal Santoshi <[hidden email]> wrote:

>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal













Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Vishal Santoshi
>> After the savepoint state has been written, the sink might start new .in-progress files. These files are not part of the savepoint but renamed to .pending in close().
>> On restore all pending files that are part of the savepoint are moved into final state (and possibly truncated). See handlePendingInProgressFiles() method.
>> Pending files that are not part of the savepoint (because they were created later between taking the savepoint and shutting the job down) are not touched and remain >> as .pending files.

>> These should be the .pending files that you observe. Since they contain data that is not part of the savepoint, it should be save to delete them.
>> If you keep them, you will have at-least-once output.


Is the above also possible with in-progress file ?  I had a situation where we see such  a dangling file through a restart on error.


On Wed, Feb 21, 2018 at 8:52 AM, Vishal Santoshi <[hidden email]> wrote:
Thank you Fabian, 

    What is more important ( and I think you might have addressed it in your post so sorry for being a little obtuse ) is that deleting them does not violate "at-least-once" delivery.  And if that is a definite than we are fine with it, though we will test it further.

Thanks and Regards.



On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal, hi Mu,

After the savepoint state has been written, the sink might start new .in-progress files. These files are not part of the savepoint but renamed to .pending in close().
On restore all pending files that are part of the savepoint are moved into final state (and possibly truncated). See handlePendingInProgressFiles() method.
Pending files that are not part of the savepoint (because they were created later between taking the savepoint and shutting the job down) are not touched and remain as .pending files.

These should be the .pending files that you observe. Since they contain data that is not part of the savepoint, it should be save to delete them.
If you keep them, you will have at-least-once output.

Best, Fabian


2018-02-21 5:04 GMT+01:00 Mu Kong <[hidden email]>:
Hi Aljoscha,

Thanks for confirming that fact that Flink doesn't clean up pending files.
Is that safe to clean(remove) all the pending files after cancel(w/ or w/o savepointing) or failover?
If we do that, will we lose some data?

Thanks!

Best,
Mu




On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <[hidden email]> wrote:
Sorry, but just wanted to confirm that  the assertion "at-least-once"  delivery  true if there is a dangling pending file ? 

On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <[hidden email]> wrote:
That is fine, till flink assure at-least-once semantics ? 

If the contents of a .pending file, through the turbulence ( restarts etc )  are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.

Best,
Aljoscha


On 19. Feb 2018, at 17:01, Vishal Santoshi <[hidden email]> wrote:

>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal














Reply | Threaded
Open this post in threaded view
|

Re: Regarding BucketingSink

Aljoscha Krettek
I would expect that to be possible as well, yes.

On 21. Apr 2018, at 17:33, Vishal Santoshi <[hidden email]> wrote:

>> After the savepoint state has been written, the sink might start new .in-progress files. These files are not part of the savepoint but renamed to .pending in close().
>> On restore all pending files that are part of the savepoint are moved into final state (and possibly truncated). See handlePendingInProgressFiles() method.
>> Pending files that are not part of the savepoint (because they were created later between taking the savepoint and shutting the job down) are not touched and remain >> as .pending files.

>> These should be the .pending files that you observe. Since they contain data that is not part of the savepoint, it should be save to delete them.
>> If you keep them, you will have at-least-once output.


Is the above also possible with in-progress file ?  I had a situation where we see such  a dangling file through a restart on error.


On Wed, Feb 21, 2018 at 8:52 AM, Vishal Santoshi <[hidden email]> wrote:
Thank you Fabian, 

    What is more important ( and I think you might have addressed it in your post so sorry for being a little obtuse ) is that deleting them does not violate "at-least-once" delivery.  And if that is a definite than we are fine with it, though we will test it further.

Thanks and Regards.



On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal, hi Mu,

After the savepoint state has been written, the sink might start new .in-progress files. These files are not part of the savepoint but renamed to .pending in close().
On restore all pending files that are part of the savepoint are moved into final state (and possibly truncated). See handlePendingInProgressFiles() method.
Pending files that are not part of the savepoint (because they were created later between taking the savepoint and shutting the job down) are not touched and remain as .pending files.

These should be the .pending files that you observe. Since they contain data that is not part of the savepoint, it should be save to delete them.
If you keep them, you will have at-least-once output.

Best, Fabian


2018-02-21 5:04 GMT+01:00 Mu Kong <[hidden email]>:
Hi Aljoscha,

Thanks for confirming that fact that Flink doesn't clean up pending files.
Is that safe to clean(remove) all the pending files after cancel(w/ or w/o savepointing) or failover?
If we do that, will we lose some data?

Thanks!

Best,
Mu




On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <[hidden email]> wrote:
Sorry, but just wanted to confirm that  the assertion "at-least-once"  delivery  true if there is a dangling pending file ? 

On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <[hidden email]> wrote:
That is fine, till flink assure at-least-once semantics ? 

If the contents of a .pending file, through the turbulence ( restarts etc )  are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.

Best,
Aljoscha


On 19. Feb 2018, at 17:01, Vishal Santoshi <[hidden email]> wrote:

>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved to "final" as part of recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha


On 19. Feb 2018, at 16:37, Till Rohrmann <[hidden email]> wrote:

Hi Vishal,

what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <[hidden email]> wrote:
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in handleRestoredBucketState .
They are finalized in notifyCheckpointComplete


I'm looking into this part of the source code now, since we are experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <[hidden email]> wrote:
-rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
-rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
-rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending file. I am sure this is not what is meant to be.   We I think have an edge condition and looking at the code it is not obvious. May be some one who wrote the code can shed some light as to how can this happen.



On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <[hidden email]> wrote:
without --allowNonRestoredState, on a suspend/resume we do see the length file along with the finalized file ( finalized during resume ) 

-rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense. 

I guess we should document --allowNonRestoredState better ? It seems it actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <[hidden email]> wrote:
This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile


is called from close()




and that moves files to pending state.  That I would presume is called when one does a cancel.



2. The restore on resume 


calls 

handleRestoredBucketState


clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredStatis skipping getting the state . At least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not exactly clear what it does if we add an operator ( GDF I think will add a new operator in the DAG without state even if stateful, in my case the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <[hidden email]> wrote:
What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using 
--allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).

* I see that a file on hdfs, the file being written to ( before the cancel with save point )  go into a pending state  _part-0-21.pending 

* I see a new file being written to in the resumed pipe    _part-0-22.in-progress.

What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume circumstance. Further it is a rename and hdfs mv is not an expensive operation.


Am I understanding the process correct and it yes any pointers ?

Regards,

Vishal