Distributed reading and parsing of protobuf files from S3 in Apache Flink

classic Classic list List threaded Threaded
6 messages Options
ShB
Reply | Threaded
Open this post in threaded view
|

Distributed reading and parsing of protobuf files from S3 in Apache Flink

ShB
I'm working with Apache Flink on reading, parsing and processing data from S3. I'm using the DataSet API, as my data is bounded and doesn't need streaming semantics.

My data is on S3 in binary protobuf format in the form of a large number of timestamped files. Each of these files have to be read, parsed(using parseDelimiedFrom) into their custom protobuf java classes and then processed.

I’m currently using the aws-java-sdk to read these files as I couldn’t figure out how to read binary protobufs via Flink semantics(env.readFile). But I'm getting OOM errors as the number/size of files is too large.

So I'm looking to do distributed/parallel reading and parsing of the files in Flink. How can these custom binary files be read from s3 using the Flink Dataset API(like env.readFile)? How can these custom binary files be read from s3 in a distributed manner?
Reply | Threaded
Open this post in threaded view
|

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

Fabian Hueske-2
Hi,

it depends on the file format whether a file can be read in parallel or not. Basically, you have to be able to identify valid offsets from which you can start reading.
There are a few techniques like fixed sized blocks with padding or a footer section with split offsets, but if the file is already written and does not offer these features, there is no way to read it in parallel.

To read a file without splitting it, you can implement a custom FileInputFormat and set the "unsplittable" member field to "true".
This will create one input split for each file. In nextRecord(), you could parse the file record by record.

Hope this helps,
Fabian

2017-07-26 20:47 GMT+02:00 ShB <[hidden email]>:
I'm working with Apache Flink on reading, parsing and processing data from
S3. I'm using the DataSet API, as my data is bounded and doesn't need
streaming semantics.

My data is on S3 in binary protobuf format in the form of a large number of
timestamped files. Each of these files have to be read, parsed(using
parseDelimiedFrom
<https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/Parser#parseDelimitedFrom-java.io.InputStream->
) into their custom protobuf java classes and then processed.

I’m currently using the aws-java-sdk to read these files as I couldn’t
figure out how to read binary protobufs via Flink semantics(env.readFile).
But I'm getting OOM errors as the number/size of files is too large.

So I'm looking to do distributed/parallel reading and parsing of the files
in Flink. How can these custom binary files be read from s3 using the Flink
Dataset API(like env.readFile)? How can these custom binary files be read
from s3 in a distributed manner?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Distributed-reading-and-parsing-of-protobuf-files-from-S3-in-Apache-Flink-tp14480.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

ShB
Reply | Threaded
Open this post in threaded view
|

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

ShB
Hi Fabian,

Thank you so much for your quick response, I appreciate it.

Since I'm working with a very large number of files of small sizes, I don't
necessarily need to read each file in parallel.

I need to read a my large list of files in parallel - that is, split up my
list of files into smaller subsets and have each task manager read a subset
of them.

I implemented it like this:
env.fromCollection(fileList).rebalance().flatMap(new ReadFiles());
where ReadFiles is a map function that reads each of the files from S3 using
the AWS S3 Java SDK and parses and emits each of the protobufs.

Is this implementation an efficient way of solving this problem?

Is there a more performant way of reading a large number of files from S3 in
a distributed manner, with perhaps env.readFile()?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

Fabian Hueske-2
Hi,

this is a valid approach.
It might suffer from unbalanced load if the reader tasks process the files at different speed (or the files vary in size) because each task has to process the same number of files.

An alternative would be to implement your own InputFormat.
The input format would create an InputSplit for each file to read.
At runtime, the JM fetches a list of all InputSplits and lazily distributes them to running source tasks.
This automatically balances the load because faster source tasks will process more splits than slower ones.

Best, Fabian

2017-08-31 0:24 GMT+02:00 ShB <[hidden email]>:
Hi Fabian,

Thank you so much for your quick response, I appreciate it.

Since I'm working with a very large number of files of small sizes, I don't
necessarily need to read each file in parallel.

I need to read a my large list of files in parallel - that is, split up my
list of files into smaller subsets and have each task manager read a subset
of them.

I implemented it like this:
env.fromCollection(fileList).rebalance().flatMap(new ReadFiles());
where ReadFiles is a map function that reads each of the files from S3 using
the AWS S3 Java SDK and parses and emits each of the protobufs.

Is this implementation an efficient way of solving this problem?

Is there a more performant way of reading a large number of files from S3 in
a distributed manner, with perhaps env.readFile()?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

ShB
Reply | Threaded
Open this post in threaded view
|

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

ShB
Hi Fabian,

Thanks for your response.

If I implemented my own InputFormat, how would I read a specific list of
files from S3?

Assuming I need to use readFile(), below would read all of the files from
the specified S3 bucket or path:
env.readFile(MyInputFormat, "s3://my-bucket/")

Is there a way for me to read only a specific list/subset of files(say
fileList) from a S3 bucket, in parallel using readFile?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

Fabian Hueske-2
Hi,

readFile() requests a FileInputFormat, i.e., your custom InputFormat would need to extend FileInputFormat.
In general, any InputFormat decides about what to read when generating InputSplits. In your case the, createInputSplits() method should return one InputSplit for each file it wants to read.
By default, FileInputFormat creates one or more input splits for each file in a directory. If you only want to read a subset of files (or have a list of files to read), you should override the method and return exactly one input split for each file to read (because your files cannot be read in parallel).

If your InputFormat does not extend FileInputFormat, you can use createInput() instead of readFile().

Best, Fabian

2017-08-31 21:24 GMT+02:00 ShB <[hidden email]>:
Hi Fabian,

Thanks for your response.

If I implemented my own InputFormat, how would I read a specific list of
files from S3?

Assuming I need to use readFile(), below would read all of the files from
the specified S3 bucket or path:
env.readFile(MyInputFormat, "s3://my-bucket/")

Is there a way for me to read only a specific list/subset of files(say
fileList) from a S3 bucket, in parallel using readFile?