Hi, I have some questions about the new StreamingFileSink in 1.6. My usecase is pretty simple. I have a cassandra table with 150Millions of lines. They are partitioned by buckets of 100 000 lines. My job is to export each "bucket" to a file (1 bucket = 1 file), so the job is degined like this: The source get the bucketList then a flatmap task, fetch the lines matching the bucket and map all the 100 000 lines from cassandra to the collector then a streamingFileSink write each line into a file by bucket (RowFormat). The checkpointing is enabled, each 10s The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is implemented by bucketId (my bucketId not the sink's one :). My problem is at the end of the job, i only have in-progress.part files for each bucket. I do not understand how i can trigger the finalization of the sink and have the bucket part files committed. So I read the code of the StreamingFileSink and the Bucket classes. If i have well understood, the in-progress bucket files can be closed then committed (closePartFile method of the Bucket) and move to "pending state" following the rollingPolicy to wait for a checkpoint to be moved to "finished" state. So the rollingPolicy can roll part files on each line, on each BucketInterval or on each checkpoint. In my case with the OnCheckpointRollingPolicy, it is only on each checkpoint. Am I right ? And when a checkpoint is successful all the pending file are moved to "finished" state and are exploitable by another jobs. then this is where I start losing myself. indeed one thing suprised me in the code of the close method of the StreamingFileSink. It discards all active buckets since the last successful checkpoint! but at the end of the successful job, no checkpoint is triggered automatically if the minimal interval since the last checkpoint is not expired. so what happen to data written since the last checkpoint ? (-> Is this sink only for endless Stream ?) How do i do to get all my file with all my data in "finished" state when my job is finished with success ? Do I need to trigger a checkpoint manually? Is there a better fitting sink for my usecase ? Should i use a another rollingPolicy ? even with the bucket interval there still is a window between the interval and the end of the job during which some part files are not closed and committed. Even in case of an endless stream, I suggest to improve the behavior of the close method by calling closePartFile on each active bucket so all valid data since last checkpoint can be committed to pending state waiting for the checkpoint a the end of the job. it seems to be the case of the BucketingSink -> I can do a PR for this I'm open to all suggestions :) regards, |
Hi Benoit,
Thanks for using the StreamingFileSink. My answers/explanations are inlined. In most of your observations, you are correct.
1) The in-progress files are closed based on the rolling policy and they are put in pending state, 2) they are staged for commit when the “next” checkpoint arrives after they are put to pending state, and 3) they are published when that checkpoint is declared successful.
The OnCheckpointRollingPolicy rolls the in-progress file when it receives a checkpoint, and publishes the file when the checkpoint is complete. But in general, you can specify your own rolling policy when using row formats. There, you can specify to roll: 1) based on an inactivity interval or interval since the in-progress file was created 2) based on size of the file 3) on every checkpoint 4) a combination of the above I would recommend to check the DefaultRollingPolicy for an example.
You are correct here that this is an undesired behavior for jobs with finite input. The reason is that due to legacy, the close() method is called for both normal (i.e. successful) and abnormal (i.e. failure) termination of the job. Given this, we could not simply declare the pending files as valid because in case of a failure, we would have inconsistencies. So we went for the conservative approach which simply discards the files.
some checkpoints to come and succeed, before canceling the job. I know and I agree that this is not the most elegant solution, but until the root problem is fixed, I think that this is the best solution. I hope this helps, Kostas
|
Hi Kostas, Sorry for jumping in on this discussion :) What you suggest for finite sources and waiting for checkpoints is pretty ugly in many cases. Especially if you would otherwise read from a finite source (a file for instance) and want to end the job asap. Would it make sense to not discard and delete the buckets, just keep the files? They could be marked somewhat differently from pending files but at least they would still be accessible for users in these cases. Gyula Kostas Kloudas <[hidden email]> ezt írta (időpont: 2018. aug. 22., Sze, 10:36):
|
Thanks for the detailed answer. The actual behavior is correct and due to the legacy which do not make a difference between success and failure when closing the sink. So the workaround is to use a short bucket interval to commit the last received data and wait for the next checkpoint (how do I do if my job is finished)? Is there any other solution, like using another sink ? --
Le August 22, 2018 à 10:48:57 AM, Gyula Fóra ([hidden email]) a écrit:
|
Free forum by Nabble | Edit this page |