Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could. This code for example
few things I see and am not sure I follow about the new RollingFileSink vis a vis BucketingSink 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink. I would assume that it would be pending and then finalized on checkpoint for exactly once semantics ? 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ? 3. The inprogress files are like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?I have the following set up on the envenv.enableCheckpointing(10 * 60000); |
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once. Tim On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
|
Thanks for the quick reply. I am confused. If this was a more full featured BucketingSink ,I would imagine that based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be finalized. For exactly once any files ( in progress file ) will have a length of the file snapshotted to the checkpoint and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported ) if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint. Am I better off using BucketingSink ? When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) Regards. On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
|
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). I guess the question is Is the call to shouldRollOnCheckPoint. done by the checkpointing thread ? Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread as in inlined ? On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
|
My apologies for not seeing your use case properly. The constraint on rolling policy is only applicable for bulk formats such as Parquet as highlighted in the docs. As for your questions, I'll have to defer to others more familiar with it. I mostly just use bulk formats such as avro and parquet. Tim On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <[hidden email] wrote:
|
You don't have to. Thank you for the input. On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <[hidden email]> wrote:
|
Any one ? On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <[hidden email]> wrote:
|
Hi Vishal, Kostas (in CC) should be able to help here. Best, Fabian Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <[hidden email]>:
|
Thanks Fabian, more questions 1. I had on k8s standlone job env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the default. The job failed on chkpoint and I would have imagined that under HA the job would restore from the last checkpoint but it did not ( The UI showed the job had restarted without a restore . The state was wiped out and the job was relaunched but with no state. 2. I had the inprogress files from that failed instance and that is consistent with no restored state. Thus there are few questions 1. In k8s and with stand alone job cluster, have we tested the scenerio of the container failing ( the pod remained in tact ) and restore ? In this case the pod remained up and running but it was definitely a clean relaunch of the container the pod was executing. 2. Did I have any configuration missing . given the below ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 3. What is the nature of RollingFileSink ? How does it enable exactly once semantics ( or does it not . ) ? Any help will be appreciated. Regards. On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <[hidden email]> wrote:
|
Hi Vishal, For the StreamingFileSink vs Rolling/BucketingSink: - you can use the StreamingFileSink instead of the Rolling/BucketingSink. You can see the StreamingFileSink as an evolution of the previous two. In the StreamingFileSink the files in Pending state are not renamed, but they keep their "*in-progress*" name. This is the reason why you do not see .pending files anymore. What Timothy said for bulk formats is correct. They only support "onCheckpoint" rolling policy. Now for the second issue about deployment, I would recommend to open a new thread so that people can see from the title if they can help or not. In addition, it is good to have the title indicating the content of the topic for the community. The mailing list is searchable by search engines, so if someone has a similar question, the title will help to retrieve the relevant thread. Cheers, Kostas On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi <[hidden email]> wrote:
Kostas Kloudas | Software Engineer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Awesome, thanks! Will open a new thread. But yes the inprogress file was helpful. On Thu, Feb 14, 2019, 7:50 AM Kostas Kloudas <[hidden email] wrote:
|
Free forum by Nabble | Edit this page |