Small-files source - partitioning based on prefix of file

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Small-files source - partitioning based on prefix of file

Averell
Hi everyone,

We are collecting log files from tens of thousands of network nodes, and we
need to do some data insights using that. The files are coming with the
corresponding node ID in the file name, and I want to do custom partitioning
using that Node ID.
Right now (with Flink 1.5) I think that is not supported. I have been trying
to look into the code, but it would take some time for me to understand.
From the GUI, it looks like the first step of file source (directory
monitoring) is rebalancing the stream to the 2nd step (file reader). And as
per Flink document, rebalancing means round-robin. However, I could not find
the call of "rebalancing" method, but "transform" is called. Not much
information about that "transform" method though.

Would it possible for me to ask for some guideline on this?

Thanks for your help.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

vino yang

2018-07-30 18:57 GMT+08:00 Averell <[hidden email]>:
Hi everyone,

We are collecting log files from tens of thousands of network nodes, and we
need to do some data insights using that. The files are coming with the
corresponding node ID in the file name, and I want to do custom partitioning
using that Node ID.
Right now (with Flink 1.5) I think that is not supported. I have been trying
to look into the code, but it would take some time for me to understand.
From the GUI, it looks like the first step of file source (directory
monitoring) is rebalancing the stream to the 2nd step (file reader). And as
per Flink document, rebalancing means round-robin. However, I could not find
the call of "rebalancing" method, but "transform" is called. Not much
information about that "transform" method though.

Would it possible for me to ask for some guideline on this?

Thanks for your help.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Averell
Thank you Vino.

Yes, I went thru that official guide before posting this question. The
problem was that I could not see any call to one of those mentioned
partitioning methods (partitionCustom, shuffle, rebalance, rescale, or
broadcast) in the original readFile function. I'm still trying to look into
the code.
There should always be a way to do it, but I hope that you / someone can
help me with the easiest way - kind of a small customization at the very
place that "directory monitoring" hands-over the file splits to "file
reader".

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

vino yang
Hi Averell,

Yes, you can not do it in the source function. I think you can call keyBy with a partitioner (based on NodeID) after source.
Why do you have to use the customized partitioner in the source function?

Thanks, vino.

2018-07-30 19:56 GMT+08:00 Averell <[hidden email]>:
Thank you Vino.

Yes, I went thru that official guide before posting this question. The
problem was that I could not see any call to one of those mentioned
partitioning methods (partitionCustom, shuffle, rebalance, rescale, or
broadcast) in the original readFile function. I'm still trying to look into
the code.
There should always be a way to do it, but I hope that you / someone can
help me with the easiest way - kind of a small customization at the very
place that "directory monitoring" hands-over the file splits to "file
reader".

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Averell
Thanks Vino.

Yes, I can do that after the source function. But that means data would be
shuffled - sending from every source to the right partition.
I think that by doing the partition from within the file source would help
to save that shuffling.

Thanks.
Averell.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

vino yang
Hi Averell,

As far as I know, the custom partitioner will inevitably lead to shuffle of data. 
Even if it is bundled in the logic of the source function, isn't the behavior different?

Thanks, vino.

2018-07-30 20:32 GMT+08:00 Averell <[hidden email]>:
Thanks Vino.

Yes, I can do that after the source function. But that means data would be
shuffled - sending from every source to the right partition.
I think that by doing the partition from within the file source would help
to save that shuffling.

Thanks.
Averell.

Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Averell
Oh, Thank you Vino. I was not aware of that reshuffling after every custom
partitioning. Why would that needed though?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

vino yang
Hi Averell,

The keyBy transformation will trigger the key partition, which is one of the various partition types supported by Flink, which causes the data to be shuffled. 
It routes the keys of the same hash value to the same node based on the hash of the key you passed (or generated by the custom partitioner).

Thanks, vino.

2018-07-31 9:33 GMT+08:00 Averell <[hidden email]>:
Oh, Thank you Vino. I was not aware of that reshuffling after every custom
partitioning. Why would that needed though?

Thanks and regards,
Averell

Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Averell
Hi Vino,

I'm a little bit confused.
If I can do the partitioning from within the source function, using the same
hash function on the key to identify the partition, would that be sufficient
to avoid shuffling in the next byKey call?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

vino yang
Hi Averell,

Actually, Performing a key partition inside the Source Function is the same as DataStream[Source].keyBy(cumstom partitioner), because keyBy is not a real operator, but a virtual node in a DAG, which does not correspond to a physical operator. 

Thanks, vino.

2018-07-31 10:52 GMT+08:00 Averell <[hidden email]>:
Hi Vino,

I'm a little bit confused.
If I can do the partitioning from within the source function, using the same
hash function on the key to identify the partition, would that be sufficient
to avoid shuffling in the next byKey call?

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Fabian Hueske-2
Hi Averell,

The records emitted by the monitoring tasks are "just" file splits, i.e., meta information that defines which data to read from where.
The reader tasks receive these splits and process them by reading the corresponding files.

You could of course partition the splits based on the file name (or whatever attribute) however, this is not the only thing you need to change if you want to have a fault tolerant setup.
A reader task stores the splits that it hasn't processed yet in operator state which is randomly redistributed when the operator recovers from a failure (or when rescaling the appliation)
You would need to change the logic of the reader task as well to ensure that the splits are deterministically assigned to reader tasks.

TBH, I would just add a keyBy() after the source. Since, the monitoring sink just emits meta data, the data won't be shuffled twice.

Best, Fabian

2018-07-31 6:54 GMT+02:00 vino yang <[hidden email]>:
Hi Averell,

Actually, Performing a key partition inside the Source Function is the same as DataStream[Source].keyBy(cumstom partitioner), because keyBy is not a real operator, but a virtual node in a DAG, which does not correspond to a physical operator. 

Thanks, vino.

2018-07-31 10:52 GMT+08:00 Averell <[hidden email]>:
Hi Vino,

I'm a little bit confused.
If I can do the partitioning from within the source function, using the same
hash function on the key to identify the partition, would that be sufficient
to avoid shuffling in the next byKey call?

Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Averell
Hi Fabian,

Thanks for the information. I will try to look at the change to that complex
logic that you mentioned when I have time. That would save one more shuffle
(from 1 to 0), wouldn't that?

BTW, regarding fault tolerant in the file reader task, could you help
explain what would happen if the reader task crash in the middle of reading
one split? E.g: the split has 100 lines, and the reader crashed after
reading 30 lines. What would happen when the operator gets resumed? Would
those first 30 lines get reprocessed the 2nd time?

Those tens of thousands of files that I have are currently not in CSV
format. Each file has some heading session of 10-20 lines (common data for
the node), then data session with one CSV line for each record, then again
some common data, and finally, a 2nd data session - one CSV line for each
record.
My current solution is to write a non-Flink job to preprocess those files
and bring them to standard CSV format to be the input for Flink.

I am thinking of doing this in Flink, with a custom file reader function
which works in a similar way to wholeTextFile function in Spark batch
processing. However, I don't know how to have fault tolerance in doing that
yet.

Thank you very much for your support.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Fabian Hueske-2
Hi Averell,

please find my answers inlined.

Best, Fabian

2018-07-31 13:52 GMT+02:00 Averell <[hidden email]>:
Hi Fabian,

Thanks for the information. I will try to look at the change to that complex
logic that you mentioned when I have time. That would save one more shuffle
(from 1 to 0), wouldn't that?

I'm not 100% sure about that. You would need to implement it in a way that you can use the "reinterpret as keyed stream" feature which is currently experimental [1].
Not sure if that's possible.
 

BTW, regarding fault tolerant in the file reader task, could you help
explain what would happen if the reader task crash in the middle of reading
one split? E.g: the split has 100 lines, and the reader crashed after
reading 30 lines. What would happen when the operator gets resumed? Would
those first 30 lines get reprocessed the 2nd time?


This depends on the implementation of the InputFormat. If it implements the CheckpointableInputFormat interface, it is able to checkpoint the current reading position in a split and can be reset to that position during recovery.
In either case, some record will be read twice but if reading position can be reset, you can still have exactly-once state consistency because the state is reset as well.
 
Those tens of thousands of files that I have are currently not in CSV
format. Each file has some heading session of 10-20 lines (common data for
the node), then data session with one CSV line for each record, then again
some common data, and finally, a 2nd data session - one CSV line for each
record.
My current solution is to write a non-Flink job to preprocess those files
and bring them to standard CSV format to be the input for Flink.

You can implement that with a custom FileInputFormat.
 

I am thinking of doing this in Flink, with a custom file reader function
which works in a similar way to wholeTextFile function in Spark batch
processing. However, I don't know how to have fault tolerance in doing that
yet.

Thank you very much for your support.

Regards,



Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Averell
Thank you Fabian.
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"
I do not quite understand this statement. If I have read 30 lines from the
checkpoint and sent those 30 records to the next operator, then when the
streaming is recovered - resumed from the last checkpoint, the subsequent
operator would receive those 30 lines again, am I right?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

vino yang
Hi Averell,

You need to understand that Flink reflects the recovery of the state, not the recovery of the record. 
Of course, sometimes your record is state, but sometimes the intermediate result of your record is the state. 
It depends on your business logic and your operators.

Thanks, vino.

Averell <[hidden email]> 于2018年8月8日周三 下午1:17写道:
Thank you Fabian.
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"
I do not quite understand this statement. If I have read 30 lines from the
checkpoint and sent those 30 records to the next operator, then when the
streaming is recovered - resumed from the last checkpoint, the subsequent
operator would receive those 30 lines again, am I right?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Fabian Hueske-2
Hi Averall,

As Vino said, checkpoints store the state of all operators of an application.
The state of a monitoring source function is the position in the currently read split and all splits that have been received and are currently pending.

In case of a recovery, the splits are recovered and the source is reset to the split that was read when the checkpoint was taken and set to the correct reading position.
Once, that is done, records that have been read before are read again. However, that does not affect the exactly-once guarantees of the operators state because all of them have been reset to the same position.

Best, Fabian

2018-08-08 9:26 GMT+02:00 vino yang <[hidden email]>:
Hi Averell,

You need to understand that Flink reflects the recovery of the state, not the recovery of the record. 
Of course, sometimes your record is state, but sometimes the intermediate result of your record is the state. 
It depends on your business logic and your operators.

Thanks, vino.

Averell <[hidden email]> 于2018年8月8日周三 下午1:17写道:
Thank you Fabian.
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"
I do not quite understand this statement. If I have read 30 lines from the
checkpoint and sent those 30 records to the next operator, then when the
streaming is recovered - resumed from the last checkpoint, the subsequent
operator would receive those 30 lines again, am I right?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Averell
Thank you Vino and Fabien for your help in answering my questions. As my
files are small, I think there would not be much benefit in checkpointing
file offset state.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Averell
Hi Fabian, Vino,

I have one more question, which I initially planned to create a new thread,
but now I think it is better to ask here:
I need to process one big tar.gz file which contains multiple small gz
files. What is the best way to do this? I am thinking of having one single
thread process that read the TarArchiveStream (which has been decompressed
from that tar.gz by Flink automatically), and then distribute the
TarArchiveEntry entries to a multi-thread operator which would process the
small files in parallel. If this is feasible, which elements from Flink I
can reuse?

Thanks a lot.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

vino yang
Hi Averell,

In this case, I think you may need to extend Flink's existing source. 
First, read your tar.gz large file, when it been decompressed, use the multi-threaded ability to read the record in the source, and then parse the data format (map / flatmap  might be a suitable operator, you can chain them with source because these two operator don't require data shuffle).

Note that Flink doesn't encourage creating extra threads in UDFs, but I don't know if there is a better way for this scenario.

Thanks, vino.

Averell <[hidden email]> 于2018年8月10日周五 下午12:05写道:
Hi Fabian, Vino,

I have one more question, which I initially planned to create a new thread,
but now I think it is better to ask here:
I need to process one big tar.gz file which contains multiple small gz
files. What is the best way to do this? I am thinking of having one single
thread process that read the TarArchiveStream (which has been decompressed
from that tar.gz by Flink automatically), and then distribute the
TarArchiveEntry entries to a multi-thread operator which would process the
small files in parallel. If this is feasible, which elements from Flink I
can reuse?

Thanks a lot.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Small-files source - partitioning based on prefix of file

Jörn Franke
Or you write a custom file system for Flink... (for  the tar part).
Unfortunately gz files can only be processed single threaded (there are some multiple thread implementation but they don’t bring the big gain). 

On 10. Aug 2018, at 07:07, vino yang <[hidden email]> wrote:

Hi Averell,

In this case, I think you may need to extend Flink's existing source. 
First, read your tar.gz large file, when it been decompressed, use the multi-threaded ability to read the record in the source, and then parse the data format (map / flatmap  might be a suitable operator, you can chain them with source because these two operator don't require data shuffle).

Note that Flink doesn't encourage creating extra threads in UDFs, but I don't know if there is a better way for this scenario.

Thanks, vino.

Averell <[hidden email]> 于2018年8月10日周五 下午12:05写道:
Hi Fabian, Vino,

I have one more question, which I initially planned to create a new thread,
but now I think it is better to ask here:
I need to process one big tar.gz file which contains multiple small gz
files. What is the best way to do this? I am thinking of having one single
thread process that read the TarArchiveStream (which has been decompressed
from that tar.gz by Flink automatically), and then distribute the
TarArchiveEntry entries to a multi-thread operator which would process the
small files in parallel. If this is feasible, which elements from Flink I
can reuse?

Thanks a lot.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
12