The behavior of BucketingSink is not exactly we want.
If I understood correctly, when checkpoint requested, BucketingSink will flush writer to make sure data not loss, but will not close file, nor roll new file after checkpoint. In the case of HDFS, if file length is not updated to name node(through close file or update file length specifically), MR or other data analysis tool will not read new data. This is not we desired. I also want to open new file for each checkpoint period to make sure HDFS file is persistent, because we met some bugs in flush/append hdfs file user case. Is there anyway to let BucketingSink roll file on each checkpoint? Thanks in advance. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, The BucketingSink closes files once they reached a certain size (BatchSize) or have not been written to for a certain amount of time (InactiveBucketThreshold).While being written to, files are in an in-progress state and only moved to to completed once being closed. When that happens, other systems can pick up the file and process it. Processing a non-closed file would cause many problems. You can of course take the BucketingSink code and adapt it to your use case. Best, Fabian 2018-03-20 2:13 GMT+01:00 XilangYan <[hidden email]>:
|
Thank you! Fabian
HDFS small file problem can be avoid with big checkpoint interval. Meanwhile, there is potential data lose problem in current BucketingSink. Say we consume data in kafka, when checkpoint is requested, kafka offset is update, but in-progress file in BucketingSink is remained. If flink crushed after that, data in the in-progress file is lost. Am I right? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Flink maintains its own Kafka offsets in its checkpoints and does not rely on Kafka's offset management.Please check the JavaDocs and let me know if you have questions or doubts about the mechanism. Best, Fabian 2018-03-21 7:38 GMT+01:00 XilangYan <[hidden email]>: Thank you! Fabian |
Ok, then may be I have misunderstanding about checkpoint.
I thought flink use checkpoint to store offset, but when kafka connector making a checkpoint, it doesn't know whether data is in in-progress file or in pending-file so a whole offset is saved in checkpoint. I used to guess, the data in in-progress file may be lost when checkpoint requested. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Fabian Hueske-2
Hi Febian,
Finally I have time to read the code, and it is brilliant it does provide exactly once guarantee。 However I still suggest to add the function that can close a file when checkpoint made. I noticed that there is an enhancement https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a time based rollover, but it is not very accurate. My user case is we read data from message queue, write to HDFS, and our ETL team will use the data in HDFS. In the case, ETL need to know if all data is ready to be read accurately, so we use a counter to count how many data has been wrote, if the counter is equal to the number we received, we think HDFS file is ready. We send the counter message in a custom sink so ETL can know how many data has been wrote, but if use current BucketingSink, even through HDFS file is flushed, ETL may still cannot read the data. If we can close file during checkpoint, then the result is accurately. And for the HDFS small file problem, it can be controller by use bigger checkpoint interval. I did take the BuckingSink code and adapt our case, but if it can be done in Flink, we can save to time to maintain our own branch. Thanks! Jeffrey -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Xilang I think you are doing a together work with the offline team. Also what you said ETL, ETL team want to use the data in HDFS. I would like to confirm one question from you. What is their scheduling time for every job ? 5mins or 10 mins ? My user case is we read data from message queue, write to HDFS, and our ETL ready to be read accurately I think you want to find a functionality that let the ETL team know when a bucket is ready for them to use. Correct ? If yes, please take a look on this jira : https://issues.apache.org/jira/browse/FLINK-9609 Cheers Minglei
|
In reply to this post by XilangYan
By the way, I do not think below is a correct way. As @ Fabian said. The BucketingSink closes files once they reached a certain size (BatchSize) or have not been written to for a certain amount of time (InactiveBucketThreshold). . If we can close /** After checkpoint, the file name neither .pending nor .inprogress. So ,you can check your files name under every bucket to let the ETL team know when a bucket is ready for use. Cheers Minglei
|
Thank you Minglei,
I should describe my current flow and my requirement more clearly. 1. any data we collect have a send-time 2. when collect data, we also send another counter message, says we have collect 45 message whose send-time is 2018-07-02 10:02:00 3. data is sent to kafka(or other message system), and flink receives data from kafka and write to HDFS 4. when flink finished part of messages(neither .pending nor .inprogress, when "finish" it must be finished state that can be read by other system), we send another counter message, says we have processed 40 message whose send-time is 2018-07-02 10:02:00 What i have did in flink is : 1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint 2. I add another sink says CounterSink which counts message by send-time 2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true, I close current files and move them to pending state 3. in CounterSink.snapshotState I prepare to send the special counter message 4. when checkpoint completed BucktingSink.notifyCheckpointComplete will move pending files to finish state, CounterSink.notifyCheckpointComplete will send the special counter message So in our counter-system, when the processed-message-counter is equal to the received-message-counter, it meas ETL can continue their jobs. The jira you submitted is not exactly what I want, however it will be great if we can figure out a common solution to this requirement, although I think it is difficult unless, as you said, we add some assumption like watermark. On the other side, I think watermark may be able to archived by use the combination of inactiveBucketThreashold and batchRolloverInterval already. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Xilang, Let me try to summarize your requirements. If I understood you correctly, you are not only concerned about the exactly-once guarantees but also need a consistent view of the data. The data in all files that are finalized need to originate from a prefix of the stream, i.e., all records written to the finalized files must have a timestamp smaller or equal than T and there might not be a record with a timestamp larger than T. I think this can be achieved with the current implementation and event-time processing. If the result of a job is emitted by a timer which is triggered by watermarks, you will have the prefix property (even taking out-of-order records into account!). The reason is that watermarks and checkpoint barriers are shipped like regular data records. By using timers, all computations are "synchonized" by the watermarks which cannot be overtaken by checkpoint barriers. Best, Fabian 2018-07-02 4:22 GMT+02:00 XilangYan <[hidden email]>: Thank you Minglei, |
Hi Fabian,
We did need a consistent view of data, we need the Counter and HDFS file to be consistent. For example, when the Counter indicate there is 1000 message wrote to the HDFS, there must be exactly 1000 messages in HDFS ready for read. The data we write to HDFS is collected by an Agent(which also send Counter message to count message number received), data has a timestamp and we use BucktingSink to write data into different bucket. Could you give me a clue on how to achieve this with watermark. As my understanding, watermark is designed to process out-of-order data with a know delay, how it can be used to make my CounterSink and BuckingSink consistent. Thanks, Xilang -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Xilang, I thought about this again. The bucketing sink would need to roll on event-time intervals (similar to the current processing time rolling) which are triggered by watermarks in order to support consistency. However, it would also need to maintain a write ahead log of all received rows and could only write those that are smaller than the received watermark. This would make the whole sink more complex. I'm not sure that rolling on checkpoint barriers is a good solution either. IMO, the checkpointing interval and file rolling interval should not depend on each other because it mixes different requirements and introduces challenging trade-offs. Best, Fabian 2018-07-04 11:59 GMT+02:00 XilangYan <[hidden email]>: Hi Fabian, |
Hi Febian,
With watermark, I understand it could only write those that are smaller than the received watermark, but could I know why it would also need to maintain a write ahead log of all received rows? When an event received, it just compare time with current watermark, write it to correct bucket if smaller then watermark, otherwise drop it. With with assumption, BucketingSink could close all bucket that is older than current watermark, I think it make sense as those bucket data won't change anymore. The close action could be done in checkpoint callback or when every event receive. It implemented the BucketReady mechanism @Minglei suggested in https://issues.apache.org/jira/browse/FLINK-9609 using current watermark mechanism. And I think we don't need BucketWatermark concept, as it confuse with current watermark. Thanks, Xilang -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Xilang
You can watch the jira what you referred to. I will work on this in the next couple of days. Cheers Minglei > 在 2018年7月9日,上午9:50,XilangYan <[hidden email]> 写道: > > Hi Febian, > > With watermark, I understand it could only write those that are smaller than > the received watermark, but could I know why it would also need to maintain > a write ahead log of all received rows? When an event received, it just > compare time with current watermark, write it to correct bucket if smaller > then watermark, otherwise drop it. > > With with assumption, BucketingSink could close all bucket that is older > than current watermark, I think it make sense as those bucket data won't > change anymore. The close action could be done in checkpoint callback or > when every event receive. > > It implemented the BucketReady mechanism @Minglei suggested in > https://issues.apache.org/jira/browse/FLINK-9609 using current watermark > mechanism. And I think we don't need BucketWatermark concept, as it confuse > with current watermark. > > Thanks, > Xilang > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |