Small-files source - partitioning based on prefix of file

classic Classic list List threaded Threaded
29 messages Options
Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

Fabian Hueske-2
Hi Averell,

One comment regarding what you said:

> As my files are small, I think there would not be much benefit in checkpointing file offset state.

Checkpointing is not about efficiency but about consistency.
If the position in a split is not checkpointed, your application won't operate with exactly-once state consistency unless each split produces exactly one record.

Best, Fabian

2018-08-10 9:10 GMT+02:00 Jörn Franke <[hidden email]>:
Or you write a custom file system for Flink... (for  the tar part).
Unfortunately gz files can only be processed single threaded (there are some multiple thread implementation but they don’t bring the big gain). 

On 10. Aug 2018, at 07:07, vino yang <[hidden email]> wrote:

Hi Averell,

In this case, I think you may need to extend Flink's existing source. 
First, read your tar.gz large file, when it been decompressed, use the multi-threaded ability to read the record in the source, and then parse the data format (map / flatmap  might be a suitable operator, you can chain them with source because these two operator don't require data shuffle).

Note that Flink doesn't encourage creating extra threads in UDFs, but I don't know if there is a better way for this scenario.

Thanks, vino.

Averell <[hidden email]> 于2018年8月10日周五 下午12:05写道:
Hi Fabian, Vino,

I have one more question, which I initially planned to create a new thread,
but now I think it is better to ask here:
I need to process one big tar.gz file which contains multiple small gz
files. What is the best way to do this? I am thinking of having one single
thread process that read the TarArchiveStream (which has been decompressed
from that tar.gz by Flink automatically), and then distribute the
TarArchiveEntry entries to a multi-thread operator which would process the
small files in parallel. If this is feasible, which elements from Flink I
can reuse?

Thanks a lot.

Sent from:

Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

Thank you Vino, Jorn, and Fabian.
Please forgive me for my ignorant, as I am still not able to fully
understand state/checkpointing and the statement that Fabian gave earlier:
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"

My current understanding is: checkpointing is managed at the
Execution-Environment level, and it would happen at the same time at all the
operators of the pipeline. Is this true?
My concern here is how to manage that synchronization? It would be quite
possible that at different operators, checkpointing happens at some
milliseconds apart, which would lead to duplicated or missed records,
wouldn't it?

I tried to read Flink's document about managing State  here
. However, I have not been able to find the information I am looking for.
Please help point me to the right place.

Thanks and best regards,

Sent from:
Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

Fabian Hueske-2
Hi Averell,

Conceptually, you are right. Checkpoints are taken at every operator at the same "logical" time.
It is not important, that each operator checkpoints at the same wallclock time. Instead, the need to take a checkpoint when they have processed the same input.
This is implemented with so-called Checkpoint Barriers, which are special records that are injected at the sources.
[Simplification] When an operator receives a barrier it performs a checkpoint. [/Simplification]
This way, we do not need to pause the processing of all operators but can perform the checkpoints locally for each operator.

This page of the Internal docs should help to understand how the mechanism works in detail [1].

Best, Fabian

2018-08-10 14:43 GMT+02:00 Averell <[hidden email]>:
Thank you Vino, Jorn, and Fabian.
Please forgive me for my ignorant, as I am still not able to fully
understand state/checkpointing and the statement that Fabian gave earlier:
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"

My current understanding is: checkpointing is managed at the
Execution-Environment level, and it would happen at the same time at all the
operators of the pipeline. Is this true?
My concern here is how to manage that synchronization? It would be quite
possible that at different operators, checkpointing happens at some
milliseconds apart, which would lead to duplicated or missed records,
wouldn't it?

I tried to read Flink's document about managing State  here
. However, I have not been able to find the information I am looking for.
Please help point me to the right place.

Thanks and best regards,

Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

Thank you Fabian.
It is clear to me now. Thanks a lot for your help.


Sent from:
Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

In reply to this post by Fabian Hueske-2
Hello Fabian, and all,

Please excuse me for digging this old thread up.
I have a question regarding sending of the "barrier" messages in Flink's
checkpointing mechanism: I want to know when those barrier messages are sent
when I am using a file source. Where can I find it in the source code?

I'm still with my 20,000 small files issue, when I have all those 20000
files appear to the ContinuousFileMonitorfingFunction at the same time.
It is taking only a few seconds to list all those files, but it is expected
to take about 5 minutes have those 20K files processed till my sink.
Due to some resources limitation issue, my job fails after about 3 minutes.
And what is happening after that is the job crashes, gets restored, tries to
process all 20K files from file 1 again, and ultimately fails again after 3
minutes,... It goes into an indefinite loop.

I think that this is the expected behaviour, as my current checkpoint config
is to checkpoint every 10s, and it took only a second or two for the listing
of those 20K files. Am I correct here? And do we have a solution for this?

Thanks and best regards,

Sent from:
Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

Fabian Hueske-2
Hi Averell,

Barriers are injected into the regular data flow by source functions.
In case of a file monitoring source, the barriers are injected into the stream of file splits that are passed to the ContinuousFileMonitoringFunction.
The CFMF puts the splits into a queue and processes them with a dedicated split reader thread. All state modifying operations of the thread (emitting a record, opening a new split, etc.) are guarded by a checkpoint lock.
When the CFMF receives a barrier, the checkpointing logic requests the lock and forces the split reader thread to pause. Then it requests the current state of the thread and writes it into its checkpoint.

In order to be able to properly checkpoint the state of the reading thread within a split, the InputFormat that is used to read the files must implement the CheckpointableInputFormat interface. Otherwise, a split will be read from the start.

Best, Fabian

Am Mo., 27. Aug. 2018 um 10:55 Uhr schrieb Averell <[hidden email]>:
Hello Fabian, and all,

Please excuse me for digging this old thread up.
I have a question regarding sending of the "barrier" messages in Flink's
checkpointing mechanism: I want to know when those barrier messages are sent
when I am using a file source. Where can I find it in the source code?

I'm still with my 20,000 small files issue, when I have all those 20000
files appear to the ContinuousFileMonitorfingFunction at the same time.
It is taking only a few seconds to list all those files, but it is expected
to take about 5 minutes have those 20K files processed till my sink.
Due to some resources limitation issue, my job fails after about 3 minutes.
And what is happening after that is the job crashes, gets restored, tries to
process all 20K files from file 1 again, and ultimately fails again after 3
minutes,... It goes into an indefinite loop.

I think that this is the expected behaviour, as my current checkpoint config
is to checkpoint every 10s, and it took only a second or two for the listing
of those 20K files. Am I correct here? And do we have a solution for this?

Thanks and best regards,

Sent from:
Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

Hello Fabian,

Thanks for the answer. However, my question is a little bit different.
Let me rephrase my example and my question:
     * I have 10,000 unsplittable small files to read, which, in total, has
about 10M output lines.
     * From Flink's reporting web GUI, I can see that CFMF and
ContinuousFileReaderOperator (CFRO) are reported separately.
            - CFMF needs about 10 seconds to generate all 10,000 records (as
you said, in this case, 1 record = 1 file split).
            - CFRO generates about 2M records per minute (which means CFRO
is processing at the rate of 2,000 files per minute)
     * I set the checkpointing interval = 1 minute.
In this example, /will the 1st barrier be injected into the stream of
file-splits 50 seconds after the 10,000th split, or after the 2,000th one?/

Sorry for being confusing.

Thanks and best regards,

Sent from:
Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

Fabian Hueske-2

CMCF is not a source, only the file monitoring function is. Barriers are injected by the FMF when the JM sends a checkpoint message. The barriers then travel to the CMCF and trigger the Checkpoint ING. 


Averell <[hidden email]> schrieb am Di., 28. Aug. 2018, 12:02:
Hello Fabian,

Thanks for the answer. However, my question is a little bit different.
Let me rephrase my example and my question:
     * I have 10,000 unsplittable small files to read, which, in total, has
about 10M output lines.
     * From Flink's reporting web GUI, I can see that CFMF and
ContinuousFileReaderOperator (CFRO) are reported separately.
            - CFMF needs about 10 seconds to generate all 10,000 records (as
you said, in this case, 1 record = 1 file split).
            - CFRO generates about 2M records per minute (which means CFRO
is processing at the rate of 2,000 files per minute)
     * I set the checkpointing interval = 1 minute.
In this example, /will the 1st barrier be injected into the stream of
file-splits 50 seconds after the 10,000th split, or after the 2,000th one?/

Sorry for being confusing.

Thanks and best regards,

Sent from:
Reply | Threaded
Open this post in threaded view

Re: Small-files source - partitioning based on prefix of file

Thank you Fabian.


Sent from: