Writing Parquet files with Flink

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing Parquet files with Flink

Flavio Pompermaier
Hi to all,

I was reading about optimal Parquet file size and HDFS block size.
The ideal situation for Parquet is when its block size (and thus the maximum size of each row group) is equal to the HDFS block size. The default behaviour of Flink is that the output file's size depends on the output parallelism and thus I don't know how to achieve that.
Is that feasible?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Writing Parquet files with Flink

Fabian Hueske-2
Hi Flavio,

using a default FileOutputFormat, Flink writes one output file for each data sink task, i.e., as many files as the defined parallelism.
The size of these files depends on the total output size and the distribution. If you write to HDFS, a file consists of one or more HDFS blocks.
Parquet files are internally also organized in blocks. Each Parquet block has a header with some meta information and data is organized and compressed in a columnar fashion with a block. Due to this, the ParquetInputFormat must always read a complete Parquet block.

Flink's FileInputFormats split the input data along the HDFS blocks and try to assign input splits such that blocks can be locally read. For best performance, Parquet blocks should be aligned with HDFS blocks. It is not a problem, if a Parquet block is not completely filled.

If you want to control the size of the parallel output files, you would need to know the total output size and choose the parallelism accordingly.
Flink is not able to infer the output size (depends on input size, task semantics, data distribution, etc.), so it is up to you to choose the right parallelism.

Best, Fabian


2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I was reading about optimal Parquet file size and HDFS block size.
The ideal situation for Parquet is when its block size (and thus the maximum size of each row group) is equal to the HDFS block size. The default behaviour of Flink is that the output file's size depends on the output parallelism and thus I don't know how to achieve that.
Is that feasible?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Writing Parquet files with Flink

Flavio Pompermaier
Hi Fabian,
thanks for the response!
From what is my understanding (correct me if I'm wrong) once I produce some Parquet dir that I want to read later, the number of files in the dir affects the initial parallelism of the next job, i.e.:
 - If I have less files than available tasks I will not fully exploit parallelism
 - If the number of Parquet files is greater than the number of tasks they will read the files as soon as possible (at the maximum parallelism but depending on the speed of the pipeline)

Having a single huge Parquet file could limit the performance of my Flink job because the default Hadoop IF can't exploit the parallelism at the datasource (because it relies only on the number of files found). To avoid that, I should write a custom ParquetInputFormat able to preprocess all parquet metadata in those files and extract the HDFS block to read and then generate the InputSplits. Am I right? Or am I misunderstanding something?

Best,
Flavio

On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

using a default FileOutputFormat, Flink writes one output file for each data sink task, i.e., as many files as the defined parallelism.
The size of these files depends on the total output size and the distribution. If you write to HDFS, a file consists of one or more HDFS blocks.
Parquet files are internally also organized in blocks. Each Parquet block has a header with some meta information and data is organized and compressed in a columnar fashion with a block. Due to this, the ParquetInputFormat must always read a complete Parquet block.

Flink's FileInputFormats split the input data along the HDFS blocks and try to assign input splits such that blocks can be locally read. For best performance, Parquet blocks should be aligned with HDFS blocks. It is not a problem, if a Parquet block is not completely filled.

If you want to control the size of the parallel output files, you would need to know the total output size and choose the parallelism accordingly.
Flink is not able to infer the output size (depends on input size, task semantics, data distribution, etc.), so it is up to you to choose the right parallelism.

Best, Fabian


2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I was reading about optimal Parquet file size and HDFS block size.
The ideal situation for Parquet is when its block size (and thus the maximum size of each row group) is equal to the HDFS block size. The default behaviour of Flink is that the output file's size depends on the output parallelism and thus I don't know how to achieve that.
Is that feasible?

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Writing Parquet files with Flink

Fabian Hueske-2
The number of input splits does not depend on the number of files but on the number of HDFS blocks of all files.
Reading a single file with 100 HDFS blocks and reading of 100 files with 1 block each should be divided into 100 input splits which can be read by 100 tasks concurrently (or less tasks with lazy assignment).

If you get less splits than HDFS blocks, you should check the implementation of the getInputSplits() method in you InputFormat.

Best, Fabian

2016-01-29 11:49 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks for the response!
From what is my understanding (correct me if I'm wrong) once I produce some Parquet dir that I want to read later, the number of files in the dir affects the initial parallelism of the next job, i.e.:
 - If I have less files than available tasks I will not fully exploit parallelism
 - If the number of Parquet files is greater than the number of tasks they will read the files as soon as possible (at the maximum parallelism but depending on the speed of the pipeline)

Having a single huge Parquet file could limit the performance of my Flink job because the default Hadoop IF can't exploit the parallelism at the datasource (because it relies only on the number of files found). To avoid that, I should write a custom ParquetInputFormat able to preprocess all parquet metadata in those files and extract the HDFS block to read and then generate the InputSplits. Am I right? Or am I misunderstanding something?

Best,
Flavio

On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

using a default FileOutputFormat, Flink writes one output file for each data sink task, i.e., as many files as the defined parallelism.
The size of these files depends on the total output size and the distribution. If you write to HDFS, a file consists of one or more HDFS blocks.
Parquet files are internally also organized in blocks. Each Parquet block has a header with some meta information and data is organized and compressed in a columnar fashion with a block. Due to this, the ParquetInputFormat must always read a complete Parquet block.

Flink's FileInputFormats split the input data along the HDFS blocks and try to assign input splits such that blocks can be locally read. For best performance, Parquet blocks should be aligned with HDFS blocks. It is not a problem, if a Parquet block is not completely filled.

If you want to control the size of the parallel output files, you would need to know the total output size and choose the parallelism accordingly.
Flink is not able to infer the output size (depends on input size, task semantics, data distribution, etc.), so it is up to you to choose the right parallelism.

Best, Fabian


2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I was reading about optimal Parquet file size and HDFS block size.
The ideal situation for Parquet is when its block size (and thus the maximum size of each row group) is equal to the HDFS block size. The default behaviour of Flink is that the output file's size depends on the output parallelism and thus I don't know how to achieve that.
Is that feasible?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Writing Parquet files with Flink

Flavio Pompermaier
So there's no need to worry about the number of parquet files size from the Flink point of view if I set correctly the parquet block size (equal to the HDFS block size)...
It only affects the Parquet file overhead (header and footer present in each file) and the HDFS resources required to handle them (one object for each HDFS file), right?

On Fri, Jan 29, 2016 at 11:56 AM, Fabian Hueske <[hidden email]> wrote:
The number of input splits does not depend on the number of files but on the number of HDFS blocks of all files.
Reading a single file with 100 HDFS blocks and reading of 100 files with 1 block each should be divided into 100 input splits which can be read by 100 tasks concurrently (or less tasks with lazy assignment).

If you get less splits than HDFS blocks, you should check the implementation of the getInputSplits() method in you InputFormat.

Best, Fabian

2016-01-29 11:49 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks for the response!
From what is my understanding (correct me if I'm wrong) once I produce some Parquet dir that I want to read later, the number of files in the dir affects the initial parallelism of the next job, i.e.:
 - If I have less files than available tasks I will not fully exploit parallelism
 - If the number of Parquet files is greater than the number of tasks they will read the files as soon as possible (at the maximum parallelism but depending on the speed of the pipeline)

Having a single huge Parquet file could limit the performance of my Flink job because the default Hadoop IF can't exploit the parallelism at the datasource (because it relies only on the number of files found). To avoid that, I should write a custom ParquetInputFormat able to preprocess all parquet metadata in those files and extract the HDFS block to read and then generate the InputSplits. Am I right? Or am I misunderstanding something?

Best,
Flavio

On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

using a default FileOutputFormat, Flink writes one output file for each data sink task, i.e., as many files as the defined parallelism.
The size of these files depends on the total output size and the distribution. If you write to HDFS, a file consists of one or more HDFS blocks.
Parquet files are internally also organized in blocks. Each Parquet block has a header with some meta information and data is organized and compressed in a columnar fashion with a block. Due to this, the ParquetInputFormat must always read a complete Parquet block.

Flink's FileInputFormats split the input data along the HDFS blocks and try to assign input splits such that blocks can be locally read. For best performance, Parquet blocks should be aligned with HDFS blocks. It is not a problem, if a Parquet block is not completely filled.

If you want to control the size of the parallel output files, you would need to know the total output size and choose the parallelism accordingly.
Flink is not able to infer the output size (depends on input size, task semantics, data distribution, etc.), so it is up to you to choose the right parallelism.

Best, Fabian


2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I was reading about optimal Parquet file size and HDFS block size.
The ideal situation for Parquet is when its block size (and thus the maximum size of each row group) is equal to the HDFS block size. The default behaviour of Flink is that the output file's size depends on the output parallelism and thus I don't know how to achieve that.
Is that feasible?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Writing Parquet files with Flink

Fabian Hueske-2
Yes, make both block sizes the same and you're good.
I think you can neglect the overhead, unless we are not talking about 1000's of small files (smaller than block size).

2016-01-29 12:06 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So there's no need to worry about the number of parquet files size from the Flink point of view if I set correctly the parquet block size (equal to the HDFS block size)...
It only affects the Parquet file overhead (header and footer present in each file) and the HDFS resources required to handle them (one object for each HDFS file), right?


On Fri, Jan 29, 2016 at 11:56 AM, Fabian Hueske <[hidden email]> wrote:
The number of input splits does not depend on the number of files but on the number of HDFS blocks of all files.
Reading a single file with 100 HDFS blocks and reading of 100 files with 1 block each should be divided into 100 input splits which can be read by 100 tasks concurrently (or less tasks with lazy assignment).

If you get less splits than HDFS blocks, you should check the implementation of the getInputSplits() method in you InputFormat.

Best, Fabian

2016-01-29 11:49 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks for the response!
From what is my understanding (correct me if I'm wrong) once I produce some Parquet dir that I want to read later, the number of files in the dir affects the initial parallelism of the next job, i.e.:
 - If I have less files than available tasks I will not fully exploit parallelism
 - If the number of Parquet files is greater than the number of tasks they will read the files as soon as possible (at the maximum parallelism but depending on the speed of the pipeline)

Having a single huge Parquet file could limit the performance of my Flink job because the default Hadoop IF can't exploit the parallelism at the datasource (because it relies only on the number of files found). To avoid that, I should write a custom ParquetInputFormat able to preprocess all parquet metadata in those files and extract the HDFS block to read and then generate the InputSplits. Am I right? Or am I misunderstanding something?

Best,
Flavio

On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

using a default FileOutputFormat, Flink writes one output file for each data sink task, i.e., as many files as the defined parallelism.
The size of these files depends on the total output size and the distribution. If you write to HDFS, a file consists of one or more HDFS blocks.
Parquet files are internally also organized in blocks. Each Parquet block has a header with some meta information and data is organized and compressed in a columnar fashion with a block. Due to this, the ParquetInputFormat must always read a complete Parquet block.

Flink's FileInputFormats split the input data along the HDFS blocks and try to assign input splits such that blocks can be locally read. For best performance, Parquet blocks should be aligned with HDFS blocks. It is not a problem, if a Parquet block is not completely filled.

If you want to control the size of the parallel output files, you would need to know the total output size and choose the parallelism accordingly.
Flink is not able to infer the output size (depends on input size, task semantics, data distribution, etc.), so it is up to you to choose the right parallelism.

Best, Fabian


2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I was reading about optimal Parquet file size and HDFS block size.
The ideal situation for Parquet is when its block size (and thus the maximum size of each row group) is equal to the HDFS block size. The default behaviour of Flink is that the output file's size depends on the output parallelism and thus I don't know how to achieve that.
Is that feasible?

Best,
Flavio