Flink parquet read.write performance

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink parquet read.write performance

Newport, Billy

Hi,

 

I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)

 

I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.

 

The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written.

 

Other notes:

I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.

I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.

 

Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?

 

Data Source :

hdfs dfs -du -h <folder_name>

240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet

237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet

241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet

243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet

238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet

241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet

241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet

240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet

240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet

239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet

237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet

240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet

240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet

236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet

242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet

241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet

240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet

241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet

239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet

237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet

240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet

 

 

yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]

 

 

cid:image001.png@01D31779.949AC910

 

cid:image002.png@01D31779.949AC910

 

 

Thanks,

 

Regina

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink parquet read.write performance

Aljoscha Krettek
Hi Billy,

Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.

Best,
Aljoscha

On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:

Hi,
 
I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)
 
I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.
 
The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 
 
Other notes:
I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.
I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.
 
Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?
 
Data Source :
hdfs dfs -du -h <folder_name>
240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet
237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet
241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet
243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet
238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet
241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet
241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet
240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet
240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet
239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet
237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet
240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet
240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet
236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet
242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet
241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet
240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet
241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet
239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet
237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet
240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet
 
 
yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]
 
 
<image001.png>
 
<image002.png>
 
 
Thanks,
 
Regina

Reply | Threaded
Open this post in threaded view
|

RE: Flink parquet read.write performance

Newport, Billy

There you go.

 

From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi Billy,

 

Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.

 

Best,

Aljoscha

 

On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:

 

Hi,

 

I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)

 

I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.

 

The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 

 

Other notes:

I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.

I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.

 

Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?

 

Data Source :

hdfs dfs -du -h <folder_name>

240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet

237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet

241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet

243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet

238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet

241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet

241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet

240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet

240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet

239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet

237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet

240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet

240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet

236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet

242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet

241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet

240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet

241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet

239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet

237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet

240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet

 

 

yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]

 

 

<image001.png>

 

<image002.png>

 

 

Thanks,

 

Regina

 


Order_Flink_Job.png (113K) Download Attachment
Order_Flink_Tree.png (102K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Flink parquet read.write performance

Chan, Regina
In reply to this post by Aljoscha Krettek

We profiled it and it looks like its sending the output of the datastoure->filter->map->map to the an intermediate result partition instead of writing directly to the data sink. Because of this we think it’s slow because it’s spending its time serializing it for no reason. Why does it do the forward rather than chain to the datasink?

 

 

Thanks,

Regina

 

From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi Billy,

 

Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.

 

Best,

Aljoscha

 

On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:

 

Hi,

 

I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)

 

I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.

 

The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 

 

Other notes:

I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.

I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.

 

Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?

 

Data Source :

hdfs dfs -du -h <folder_name>

240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet

237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet

241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet

243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet

238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet

241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet

241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet

240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet

240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet

239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet

237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet

240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet

240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet

236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet

242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet

241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet

240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet

241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet

239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet

237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet

240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet

 

 

yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]

 

 

<image001.png>

 

<image002.png>

 

 

Thanks,

 

Regina

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink parquet read.write performance

Aljoscha Krettek
Hi,

The Sink cannot be chained to the previous two operations because there are two operations. Chaining only works if there is one predecessor operation. Data transfer should still be pipelined but you will see serialisation overhead. What kind of TypeSerializer is used at that boundary?

Best,
Aljoscha
On 18. Aug 2017, at 21:15, Chan, Regina <[hidden email]> wrote:

We profiled it and it looks like its sending the output of the datastoure->filter->map->map to the an intermediate result partition instead of writing directly to the data sink. Because of this we think it’s slow because it’s spending its time serializing it for no reason. Why does it do the forward rather than chain to the datasink?
 
<image001.png><image002.png>
 
Thanks,
Regina
 
From: Aljoscha Krettek [[hidden email]] 
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink parquet read.write performance
 
Hi Billy,
 
Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.
 
Best,
Aljoscha
 
On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:
 
Hi,
 
I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)
 
I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.
 
The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 
 
Other notes:
I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.
I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.
 
Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?
 
Data Source :
hdfs dfs -du -h <folder_name>
240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet
237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet
241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet
243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet
238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet
241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet
241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet
240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet
240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet
239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet
237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet
240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet
240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet
236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet
242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet
241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet
240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet
241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet
239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet
237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet
240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet
 
 
yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]
 
 
<image001.png>
 
<image002.png>
 
 
Thanks,
 
Regina

Reply | Threaded
Open this post in threaded view
|

RE: Flink parquet read.write performance

Newport, Billy

Thanks Aljoscha for the prompt response.

 

Can you explain the technical reason for the single predecessor rule? This makes what we are trying to do much more expensive. Really what we’re doing is reading a parquet file, doing several maps/filters on the records and writing to the parquet. There is no serialization besides the parquet operations needed at all. The current flink implementation adds an expensive serialize/deserialize for no apparent purpose in the code.

 

Billy

 

 

 

From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Saturday, August 19, 2017 1:45 AM
To: Chan, Regina [Tech]
Cc: Newport, Billy [Tech]; [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi,

 

The Sink cannot be chained to the previous two operations because there are two operations. Chaining only works if there is one predecessor operation. Data transfer should still be pipelined but you will see serialisation overhead. What kind of TypeSerializer is used at that boundary?

 

Best,

Aljoscha

On 18. Aug 2017, at 21:15, Chan, Regina <[hidden email]> wrote:

 

We profiled it and it looks like its sending the output of the datastoure->filter->map->map to the an intermediate result partition instead of writing directly to the data sink. Because of this we think it’s slow because it’s spending its time serializing it for no reason. Why does it do the forward rather than chain to the datasink?

 

<image001.png><image002.png>

 

Thanks,

Regina

 

From: Aljoscha Krettek [[hidden email]] 
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi Billy,

 

Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.

 

Best,

Aljoscha

 

On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:

 

Hi,

 

I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)

 

I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.

 

The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 

 

Other notes:

I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.

I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.

 

Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?

 

Data Source :

hdfs dfs -du -h <folder_name>

240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet

237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet

241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet

243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet

238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet

241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet

241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet

240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet

240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet

239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet

237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet

240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet

240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet

236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet

242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet

241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet

240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet

241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet

239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet

237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet

240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet

 

 

yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]

 

 

<image001.png>

 

<image002.png>

 

 

Thanks,

 

Regina

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink parquet read.write performance

Aljoscha Krettek
Hi,

The reason is that there are two (or more) different Threads doing the reading. As an illustration, consider first this case:

DataSet input = ...
input.map(new MapA()).map(new MapB())

Here, MapB is technically "wrapped" by MapA and when MapA emits data this is directly going the the map() method of MapB. The two functions are chained.

Now, in this other case the methods cannot be chained:

DataSet input1= ...
DataSet input2
DataSet mappedA = input1.map(new MapA())
DataSet mappedB = input2.map(new MapB())

mappedA.union(mappedB).map(new MapC())

Here, there is (at least) one thread per map because none of MapA or MapB could wrap MapC such that the other one (either MapA or MapB) can still send data into MapC. Data is sent across a channel between the Threads and whenever that happens the data is serialised.

Technically, we could avoid serialization if we knew that two Threads are running in the same JVM but this is not something that Flink currently does.

Best,
Aljoscha


On 23. Aug 2017, at 17:12, Newport, Billy <[hidden email]> wrote:

Thanks Aljoscha for the prompt response.
 
Can you explain the technical reason for the single predecessor rule? This makes what we are trying to do much more expensive. Really what we’re doing is reading a parquet file, doing several maps/filters on the records and writing to the parquet. There is no serialization besides the parquet operations needed at all. The current flink implementation adds an expensive serialize/deserialize for no apparent purpose in the code.
 
Billy
 
 
 
From: Aljoscha Krettek [[hidden email]] 
Sent: Saturday, August 19, 2017 1:45 AM
To: Chan, Regina [Tech]
Cc: Newport, Billy [Tech]; [hidden email]
Subject: Re: Flink parquet read.write performance
 
Hi,
 
The Sink cannot be chained to the previous two operations because there are two operations. Chaining only works if there is one predecessor operation. Data transfer should still be pipelined but you will see serialisation overhead. What kind of TypeSerializer is used at that boundary?
 
Best,
Aljoscha
On 18. Aug 2017, at 21:15, Chan, Regina <[hidden email]> wrote:
 
We profiled it and it looks like its sending the output of the datastoure->filter->map->map to the an intermediate result partition instead of writing directly to the data sink. Because of this we think it’s slow because it’s spending its time serializing it for no reason. Why does it do the forward rather than chain to the datasink?
 
<image001.png><image002.png>
 
Thanks,
Regina
 
From: Aljoscha Krettek [[hidden email]] 
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink parquet read.write performance
 
Hi Billy,
 
Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.
 
Best,
Aljoscha
 
On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:
 
Hi,
 
I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)
 
I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.
 
The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 
 
Other notes:
I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.
I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.
 
Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?
 
Data Source :
hdfs dfs -du -h <folder_name>
240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet
237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet
241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet
243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet
238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet
241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet
241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet
240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet
240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet
239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet
237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet
240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet
240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet
236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet
242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet
241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet
240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet
241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet
239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet
237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet
240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet
 
 
yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]
 
 
<image001.png>
 
<image002.png>
 
 
Thanks,
 
Regina

Reply | Threaded
Open this post in threaded view
|

Re: Flink parquet read.write performance

Stephan Ewen
Hi!

The sink is merely a union of the result of the co-group and the one data source.
Can't you just make to distinct pipelines out of that? One with co-group -> data sink pipeline and one with the source->sink pipeline?
They could even be part of the same job...

Best,
Stephan


On Wed, Aug 23, 2017 at 5:51 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

The reason is that there are two (or more) different Threads doing the reading. As an illustration, consider first this case:

DataSet input = ...
input.map(new MapA()).map(new MapB())

Here, MapB is technically "wrapped" by MapA and when MapA emits data this is directly going the the map() method of MapB. The two functions are chained.

Now, in this other case the methods cannot be chained:

DataSet input1= ...
DataSet input2
DataSet mappedA = input1.map(new MapA())
DataSet mappedB = input2.map(new MapB())

mappedA.union(mappedB).map(new MapC())

Here, there is (at least) one thread per map because none of MapA or MapB could wrap MapC such that the other one (either MapA or MapB) can still send data into MapC. Data is sent across a channel between the Threads and whenever that happens the data is serialised.

Technically, we could avoid serialization if we knew that two Threads are running in the same JVM but this is not something that Flink currently does.

Best,
Aljoscha


On 23. Aug 2017, at 17:12, Newport, Billy <[hidden email]> wrote:

Thanks Aljoscha for the prompt response.
 
Can you explain the technical reason for the single predecessor rule? This makes what we are trying to do much more expensive. Really what we’re doing is reading a parquet file, doing several maps/filters on the records and writing to the parquet. There is no serialization besides the parquet operations needed at all. The current flink implementation adds an expensive serialize/deserialize for no apparent purpose in the code.
 
Billy
 
 
 
From: Aljoscha Krettek [[hidden email]] 
Sent: Saturday, August 19, 2017 1:45 AM
To: Chan, Regina [Tech]
Cc: Newport, Billy [Tech]; [hidden email]
Subject: Re: Flink parquet read.write performance
 
Hi,
 
The Sink cannot be chained to the previous two operations because there are two operations. Chaining only works if there is one predecessor operation. Data transfer should still be pipelined but you will see serialisation overhead. What kind of TypeSerializer is used at that boundary?
 
Best,
Aljoscha
On 18. Aug 2017, at 21:15, Chan, Regina <[hidden email]> wrote:
 
We profiled it and it looks like its sending the output of the datastoure->filter->map->map to the an intermediate result partition instead of writing directly to the data sink. Because of this we think it’s slow because it’s spending its time serializing it for no reason. Why does it do the forward rather than chain to the datasink?
 
<image001.png><image002.png>
 
Thanks,
Regina
 
From: Aljoscha Krettek [[hidden email]] 
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink parquet read.write performance
 
Hi Billy,
 
Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.
 
Best,
Aljoscha
 
On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:
 
Hi,
 
I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)
 
I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.
 
The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 
 
Other notes:
I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.
I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.
 
Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?
 
Data Source :
hdfs dfs -du -h <folder_name>
240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet
237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet
241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet
243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet
238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet
241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet
241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet
240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet
240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet
239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet
237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet
240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet
240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet
236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet
242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet
241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet
240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet
241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet
239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet
237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet
240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet
 
 
yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]
 
 
<image001.png>
 
<image002.png>
 
 
Thanks,
 
Regina


Reply | Threaded
Open this post in threaded view
|

RE: Flink parquet read.write performance

Newport, Billy

If we use two sinks with the same folder then we get file name collisions between the two sinks.

 

It sounds like even if we did that, flink isn’t capable of chaining it regardless, no?

 

We find ourselves having to manually optimize the data flow quite a bit to tell you the truth. For example:

 

FileDataset -> [Filter True -> Flow;Filter False -> Flow 2]

 

Is slower than reading the file twice i.e. FileDataset -> Filter True -> Flow and another flow FileDataset -> Filter False -> Flow 2.

 

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Wednesday, August 23, 2017 12:21 PM
To: Aljoscha Krettek
Cc: Newport, Billy [Tech]; Chan, Regina [Tech]; [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi!

 

The sink is merely a union of the result of the co-group and the one data source.

Can't you just make to distinct pipelines out of that? One with co-group -> data sink pipeline and one with the source->sink pipeline?
They could even be part of the same job...

 

Best,

Stephan

 

 

On Wed, Aug 23, 2017 at 5:51 PM, Aljoscha Krettek <[hidden email]> wrote:

Hi,

 

The reason is that there are two (or more) different Threads doing the reading. As an illustration, consider first this case:

 

DataSet input = ...

input.map(new MapA()).map(new MapB())

 

Here, MapB is technically "wrapped" by MapA and when MapA emits data this is directly going the the map() method of MapB. The two functions are chained.

 

Now, in this other case the methods cannot be chained:

 

DataSet input1= ...

DataSet input2

DataSet mappedA = input1.map(new MapA())

DataSet mappedB = input2.map(new MapB())

 

mappedA.union(mappedB).map(new MapC())

 

Here, there is (at least) one thread per map because none of MapA or MapB could wrap MapC such that the other one (either MapA or MapB) can still send data into MapC. Data is sent across a channel between the Threads and whenever that happens the data is serialised.

 

Technically, we could avoid serialization if we knew that two Threads are running in the same JVM but this is not something that Flink currently does.

 

Best,

Aljoscha

 

 

On 23. Aug 2017, at 17:12, Newport, Billy <[hidden email]> wrote:

 

Thanks Aljoscha for the prompt response.

 

Can you explain the technical reason for the single predecessor rule? This makes what we are trying to do much more expensive. Really what we’re doing is reading a parquet file, doing several maps/filters on the records and writing to the parquet. There is no serialization besides the parquet operations needed at all. The current flink implementation adds an expensive serialize/deserialize for no apparent purpose in the code.

 

Billy

 

 

 

From: Aljoscha Krettek [[hidden email]] 
Sent: Saturday, August 19, 2017 1:45 AM
To: Chan, Regina [Tech]
Cc: Newport, Billy [Tech]; [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi,

 

The Sink cannot be chained to the previous two operations because there are two operations. Chaining only works if there is one predecessor operation. Data transfer should still be pipelined but you will see serialisation overhead. What kind of TypeSerializer is used at that boundary?

 

Best,

Aljoscha

On 18. Aug 2017, at 21:15, Chan, Regina <[hidden email]> wrote:

 

We profiled it and it looks like its sending the output of the datastoure->filter->map->map to the an intermediate result partition instead of writing directly to the data sink. Because of this we think it’s slow because it’s spending its time serializing it for no reason. Why does it do the forward rather than chain to the datasink?

 

<image001.png><image002.png>

 

Thanks,

Regina

 

From: Aljoscha Krettek [[hidden email]] 
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi Billy,

 

Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.

 

Best,

Aljoscha

 

On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:

 

Hi,

 

I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)

 

I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.

 

The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 

 

Other notes:

I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.

I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.

 

Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?

 

Data Source :

hdfs dfs -du -h <folder_name>

240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet

237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet

241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet

243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet

238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet

241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet

241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet

240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet

240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet

239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet

237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet

240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet

240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet

236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet

242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet

241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet

240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet

241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet

239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet

237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet

240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet

 

 

yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]

 

 

<image001.png>

 

<image002.png>

 

 

Thanks,

 

Regina

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Flink parquet read.write performance

Newport, Billy
In reply to this post by Aljoscha Krettek

We saw the function wrapping when we were debugging it and that’s what surprised us when it suddenly serialized rather than called the writer and physically wrote the records in a separate jvm.

 

 

From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Wednesday, August 23, 2017 11:51 AM
To: Newport, Billy [Tech]
Cc: Chan, Regina [Tech]; [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi,

 

The reason is that there are two (or more) different Threads doing the reading. As an illustration, consider first this case:

 

DataSet input = ...

input.map(new MapA()).map(new MapB())

 

Here, MapB is technically "wrapped" by MapA and when MapA emits data this is directly going the the map() method of MapB. The two functions are chained.

 

Now, in this other case the methods cannot be chained:

 

DataSet input1= ...

DataSet input2

DataSet mappedA = input1.map(new MapA())

DataSet mappedB = input2.map(new MapB())

 

mappedA.union(mappedB).map(new MapC())

 

Here, there is (at least) one thread per map because none of MapA or MapB could wrap MapC such that the other one (either MapA or MapB) can still send data into MapC. Data is sent across a channel between the Threads and whenever that happens the data is serialised.

 

Technically, we could avoid serialization if we knew that two Threads are running in the same JVM but this is not something that Flink currently does.

 

Best,

Aljoscha

 

 

On 23. Aug 2017, at 17:12, Newport, Billy <[hidden email]> wrote:

 

Thanks Aljoscha for the prompt response.

 

Can you explain the technical reason for the single predecessor rule? This makes what we are trying to do much more expensive. Really what we’re doing is reading a parquet file, doing several maps/filters on the records and writing to the parquet. There is no serialization besides the parquet operations needed at all. The current flink implementation adds an expensive serialize/deserialize for no apparent purpose in the code.

 

Billy

 

 

 

From: Aljoscha Krettek [[hidden email]] 
Sent: Saturday, August 19, 2017 1:45 AM
To: Chan, Regina [Tech]
Cc: Newport, Billy [Tech]; [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi,

 

The Sink cannot be chained to the previous two operations because there are two operations. Chaining only works if there is one predecessor operation. Data transfer should still be pipelined but you will see serialisation overhead. What kind of TypeSerializer is used at that boundary?

 

Best,

Aljoscha

On 18. Aug 2017, at 21:15, Chan, Regina <[hidden email]> wrote:

 

We profiled it and it looks like its sending the output of the datastoure->filter->map->map to the an intermediate result partition instead of writing directly to the data sink. Because of this we think it’s slow because it’s spending its time serializing it for no reason. Why does it do the forward rather than chain to the datasink?

 

<image001.png><image002.png>

 

Thanks,

Regina

 

From: Aljoscha Krettek [[hidden email]] 
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink parquet read.write performance

 

Hi Billy,

 

Do you also have the data (picture) from the "Timeline" tab of the completed job? This would give some hints about how long that other DataSource (with chain) was active. It might be that the sink is waiting for the other input to become online.

 

Best,

Aljoscha

 

On 18. Aug 2017, at 14:45, Newport, Billy <[hidden email]> wrote:

 

Hi,

 

I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)

 

I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.

 

The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written. 

 

Other notes:

I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.

I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.

 

Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?

 

Data Source :

hdfs dfs -du -h <folder_name>

240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet

237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet

241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet

243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet

238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet

241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet

241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet

240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet

240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet

239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet

237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet

240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet

240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet

236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet

242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet

241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet

240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet

241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet

239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet

237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet

240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet

 

 

yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]

 

 

<image001.png>

 

<image002.png>

 

 

Thanks,

 

Regina

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink parquet read.write performance

clay4444
In reply to this post by Newport, Billy
hi Regina

I've just been using flink, and recently I've been asked to store Flink data
on HDFS in parquet format. I've found several examples in GitHub and the
community, but there are always bugs. I see your storage directory, and
that's what I want, so I'd like to ask you to reply to me for a slightly
more complete one. Some examples.

The result is similar to the following:

237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet

240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet

240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet

236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet


I look forward to your reply. Thank you very much!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/