Let BucketingSink roll file on each checkpoint

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Let BucketingSink roll file on each checkpoint

XilangYan
The behavior of BucketingSink is not exactly we want.
If I understood correctly, when checkpoint requested, BucketingSink will
flush writer to make sure data not loss, but will not close file, nor roll
new file after checkpoint.
In the case of HDFS, if file length is not updated to name node(through
close file or update file length specifically), MR or other data analysis
tool will not read new data. This is not we desired.
I also want to open new file for each checkpoint period to make sure HDFS
file is persistent, because we met some bugs in flush/append hdfs file user
case.

Is there anyway to let BucketingSink roll file on each checkpoint? Thanks in
advance.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

Fabian Hueske-2
Hi,

The BucketingSink closes files once they reached a certain size (BatchSize) or have not been written to for a certain amount of time (InactiveBucketThreshold).
While being written to, files are in an in-progress state and only moved to to completed once being closed. When that happens, other systems can pick up the file and process it.
Processing a non-closed file would cause many problems.

However, closing files on every checkpoint would likely result in many small files which HDFS doesn't support so well.
You can of course take the BucketingSink code and adapt it to your use case.

Best, Fabian

2018-03-20 2:13 GMT+01:00 XilangYan <[hidden email]>:
The behavior of BucketingSink is not exactly we want.
If I understood correctly, when checkpoint requested, BucketingSink will
flush writer to make sure data not loss, but will not close file, nor roll
new file after checkpoint.
In the case of HDFS, if file length is not updated to name node(through
close file or update file length specifically), MR or other data analysis
tool will not read new data. This is not we desired.
I also want to open new file for each checkpoint period to make sure HDFS
file is persistent, because we met some bugs in flush/append hdfs file user
case.

Is there anyway to let BucketingSink roll file on each checkpoint? Thanks in
advance.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

XilangYan
Thank you! Fabian

HDFS small file problem can be avoid with big checkpoint interval.

Meanwhile, there is potential data lose problem in current BucketingSink.
Say we consume data in kafka, when checkpoint is requested, kafka offset is
update, but in-progress file in BucketingSink is remained. If flink crushed
after that, data in the in-progress file is lost. Am I right?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

Fabian Hueske-2
Hi,

Flink maintains its own Kafka offsets in its checkpoints and does not rely on Kafka's offset management.
That way Flink guarantees that read offsets and checkpointed operator state are always aligned.
The BucketingSink is designed to not lose any data and the mode of operation is described in detail in JavaDocs of the class [1].

Please check the JavaDocs and let me know if you have questions or doubts about the mechanism.

Best, Fabian

2018-03-21 7:38 GMT+01:00 XilangYan <[hidden email]>:
Thank you! Fabian

HDFS small file problem can be avoid with big checkpoint interval.

Meanwhile, there is potential data lose problem in current BucketingSink.
Say we consume data in kafka, when checkpoint is requested, kafka offset is
update, but in-progress file in BucketingSink is remained. If flink crushed
after that, data in the in-progress file is lost. Am I right?

Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

XilangYan
Ok, then may be I have misunderstanding about checkpoint.
I thought flink use checkpoint to store offset, but when kafka connector
making a checkpoint, it doesn't know whether data is in in-progress file or
in pending-file so a whole offset is saved in checkpoint. I used to guess,
the data in in-progress file may be lost when checkpoint requested.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

XilangYan
In reply to this post by Fabian Hueske-2
Hi Febian,

Finally I have time to read the code, and it is brilliant it does provide
exactly once guarantee。
However I still suggest to add the function that can close a file when
checkpoint made. I noticed that there is an enhancement
https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a
time based rollover, but it is not very accurate.
My user case is we read data from message queue, write to HDFS, and our ETL
team will use the data in HDFS. In the case, ETL need to know if all data is
ready to be read accurately, so we use a counter to count how many data has
been wrote, if the counter is equal to the number we received, we think HDFS
file is ready. We send the counter message in a custom sink so ETL can know
how many data has been wrote, but if use current BucketingSink, even through
HDFS file is flushed, ETL may still cannot read the data. If we can close
file during checkpoint, then the result is accurately. And for the HDFS
small file problem, it can be controller by use bigger checkpoint interval.

I did take the BuckingSink code and adapt our case, but if it can be done in
Flink, we can save to time to maintain our own branch.

Thanks!
Jeffrey




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

zhangminglei
Hi Xilang

I think you are doing a together work with the offline team. Also what you said ETL, ETL team want to use the data in HDFS. I would like to confirm one question from you. What is their scheduling time for every job ? 5mins or 10 mins ? 

My user case is we read data from message queue, write to HDFS, and our ETL
team will use the data in HDFS. ETL need to know if all data is
ready to be read accurately

I think you want to find a functionality that let the ETL team know when a bucket is ready for them to use. Correct ? If yes, please take a look on this jira : https://issues.apache.org/jira/browse/FLINK-9609

Cheers
Minglei


在 2018年6月29日,上午9:03,XilangYan <[hidden email]> 写道:

Hi Febian,

Finally I have time to read the code, and it is brilliant it does provide
exactly once guarantee。
However I still suggest to add the function that can close a file when
checkpoint made. I noticed that there is an enhancement
https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a
time based rollover, but it is not very accurate.
My user case is we read data from message queue, write to HDFS, and our ETL
team will use the data in HDFS. In the case, ETL need to know if all data is
ready to be read accurately, so we use a counter to count how many data has
been wrote, if the counter is equal to the number we received, we think HDFS
file is ready. We send the counter message in a custom sink so ETL can know
how many data has been wrote, but if use current BucketingSink, even through
HDFS file is flushed, ETL may still cannot read the data. If we can close
file during checkpoint, then the result is accurately. And for the HDFS
small file problem, it can be controller by use bigger checkpoint interval.

I did take the BuckingSink code and adapt our case, but if it can be done in
Flink, we can save to time to maintain our own branch.

Thanks!
Jeffrey




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

zhangminglei
In reply to this post by XilangYan
By the way, I do not think below is a correct way. As @ Fabian said. The BucketingSink closes files once they reached a certain size (BatchSize) or have not been written to for a certain amount of time (InactiveBucketThreshold). 

. If we can close
file during checkpoint, then the result is accurately. 

And please take a look on BucketingSink code. Says, there are closed files that we are not currently writing to ….. But which were not yet confirmed by a checkpoint.

/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
After checkpoint, the file name neither .pending nor .inprogress. So ,you can check your files name under every bucket to let the ETL team know when a bucket is ready for use.

Cheers
Minglei



在 2018年6月29日,上午9:03,XilangYan <[hidden email]> 写道:

Hi Febian,

Finally I have time to read the code, and it is brilliant it does provide
exactly once guarantee。
However I still suggest to add the function that can close a file when
checkpoint made. I noticed that there is an enhancement
https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a
time based rollover, but it is not very accurate.
My user case is we read data from message queue, write to HDFS, and our ETL
team will use the data in HDFS. In the case, ETL need to know if all data is
ready to be read accurately, so we use a counter to count how many data has
been wrote, if the counter is equal to the number we received, we think HDFS
file is ready. We send the counter message in a custom sink so ETL can know
how many data has been wrote, but if use current BucketingSink, even through
HDFS file is flushed, ETL may still cannot read the data. If we can close
file during checkpoint, then the result is accurately. And for the HDFS
small file problem, it can be controller by use bigger checkpoint interval.

I did take the BuckingSink code and adapt our case, but if it can be done in
Flink, we can save to time to maintain our own branch.

Thanks!
Jeffrey




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

XilangYan
Thank you Minglei,

I should describe my current flow and my requirement more clearly.
1. any data we collect have a send-time
2. when collect data, we also send another counter message, says we have
collect 45 message whose send-time is 2018-07-02 10:02:00
3. data is sent to kafka(or other message system), and flink receives data
from kafka and write to HDFS
4. when flink finished part of messages(neither .pending nor .inprogress,
when "finish" it must be finished state that can be read by other system),
we send another counter message, says we have processed 40 message whose
send-time is  2018-07-02 10:02:00

What i have did in flink is :
1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint
2. I add another sink says CounterSink which counts message by send-time
2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true,  I
close current files and move them   to pending state
3. in CounterSink.snapshotState I prepare to send the special counter
message
4. when checkpoint completed BucktingSink.notifyCheckpointComplete will move
pending files to finish state, CounterSink.notifyCheckpointComplete  will
send the special counter message

So in our counter-system, when the processed-message-counter is equal to the
received-message-counter, it meas ETL can continue their jobs.

The jira you submitted is not exactly what I want, however it will be great
if we can figure out a common solution to this requirement, although I think
it is difficult unless, as you said, we add some assumption like watermark.
On the other side, I think watermark may be able to archived by use the
combination of inactiveBucketThreashold and batchRolloverInterval already.







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

Fabian Hueske-2
Hi Xilang,

Let me try to summarize your requirements.
If I understood you correctly, you are not only concerned about the exactly-once guarantees but also need a consistent view of the data.
The data in all files that are finalized need to originate from a prefix of the stream, i.e., all records written to the finalized files must have a timestamp smaller or equal than T and there might not be a record with a timestamp larger than T.

I think this can be achieved with the current implementation and event-time processing. If the result of a job is emitted by a timer which is triggered by watermarks, you will have the prefix property (even taking out-of-order records into account!).
The reason is that watermarks and checkpoint barriers are shipped like regular data records. By using timers, all computations are "synchonized" by the watermarks which cannot be overtaken by checkpoint barriers.

Best, Fabian

2018-07-02 4:22 GMT+02:00 XilangYan <[hidden email]>:
Thank you Minglei,

I should describe my current flow and my requirement more clearly.
1. any data we collect have a send-time
2. when collect data, we also send another counter message, says we have
collect 45 message whose send-time is 2018-07-02 10:02:00
3. data is sent to kafka(or other message system), and flink receives data
from kafka and write to HDFS
4. when flink finished part of messages(neither .pending nor .inprogress,
when "finish" it must be finished state that can be read by other system),
we send another counter message, says we have processed 40 message whose
send-time is  2018-07-02 10:02:00

What i have did in flink is :
1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint
2. I add another sink says CounterSink which counts message by send-time
2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true,  I
close current files and move them   to pending state
3. in CounterSink.snapshotState I prepare to send the special counter
message
4. when checkpoint completed BucktingSink.notifyCheckpointComplete will move
pending files to finish state, CounterSink.notifyCheckpointComplete  will
send the special counter message

So in our counter-system, when the processed-message-counter is equal to the
received-message-counter, it meas ETL can continue their jobs.

The jira you submitted is not exactly what I want, however it will be great
if we can figure out a common solution to this requirement, although I think
it is difficult unless, as you said, we add some assumption like watermark.
On the other side, I think watermark may be able to archived by use the
combination of inactiveBucketThreashold and batchRolloverInterval already.

Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

XilangYan
Hi Fabian,

We did need a consistent view of data, we need the Counter and HDFS file to
be consistent. For example, when the Counter indicate there is 1000 message
wrote to the HDFS, there must be exactly 1000 messages in HDFS ready for
read.

The data we write to HDFS is collected by an Agent(which also send Counter
message to count message number received), data has a timestamp and we use
BucktingSink to write data into different bucket.

Could you give me a clue on how to achieve this with watermark. As my
understanding, watermark is designed to process out-of-order data with a
know delay, how it can be used to make my CounterSink and BuckingSink
consistent.

Thanks, Xilang



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

Fabian Hueske-2
Hi Xilang,

I thought about this again.
The bucketing sink would need to roll on event-time intervals (similar to the current processing time rolling) which are triggered by watermarks in order to support consistency.
However, it would also need to maintain a write ahead log of all received rows and could only write those that are smaller than the received watermark. This would make the whole sink more complex.

I'm not sure that rolling on checkpoint barriers is a good solution either.
IMO, the checkpointing interval and file rolling interval should not depend on each other because it mixes different requirements and introduces challenging trade-offs.

Best, Fabian


2018-07-04 11:59 GMT+02:00 XilangYan <[hidden email]>:
Hi Fabian,

We did need a consistent view of data, we need the Counter and HDFS file to
be consistent. For example, when the Counter indicate there is 1000 message
wrote to the HDFS, there must be exactly 1000 messages in HDFS ready for
read.

The data we write to HDFS is collected by an Agent(which also send Counter
message to count message number received), data has a timestamp and we use
BucktingSink to write data into different bucket.

Could you give me a clue on how to achieve this with watermark. As my
understanding, watermark is designed to process out-of-order data with a
know delay, how it can be used to make my CounterSink and BuckingSink
consistent.

Thanks, Xilang

Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

XilangYan
Hi Febian,

With watermark, I understand it could only write those that are smaller than
the received watermark, but could I know why it would also need to maintain
a write ahead log of all received rows? When an event received, it just
compare time with current watermark,  write it to correct bucket if smaller
then watermark, otherwise drop it.

With with assumption, BucketingSink could close all bucket that is older
than current watermark, I think it  make sense as those bucket data won't
change  anymore. The close action could be done in checkpoint callback or
when every event receive.

It implemented the BucketReady mechanism @Minglei suggested in
https://issues.apache.org/jira/browse/FLINK-9609 using current watermark
mechanism. And I think we don't need BucketWatermark concept, as it confuse
with current watermark.

Thanks,
Xilang



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Let BucketingSink roll file on each checkpoint

zhangminglei
Hi, Xilang

You can watch the jira what you referred to. I will work on this in the next couple of days.

Cheers
Minglei

> 在 2018年7月9日,上午9:50,XilangYan <[hidden email]> 写道:
>
> Hi Febian,
>
> With watermark, I understand it could only write those that are smaller than
> the received watermark, but could I know why it would also need to maintain
> a write ahead log of all received rows? When an event received, it just
> compare time with current watermark,  write it to correct bucket if smaller
> then watermark, otherwise drop it.
>
> With with assumption, BucketingSink could close all bucket that is older
> than current watermark, I think it  make sense as those bucket data won't
> change  anymore. The close action could be done in checkpoint callback or
> when every event receive.
>
> It implemented the BucketReady mechanism @Minglei suggested in
> https://issues.apache.org/jira/browse/FLINK-9609 using current watermark
> mechanism. And I think we don't need BucketWatermark concept, as it confuse
> with current watermark.
>
> Thanks,
> Xilang
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/