Flink Parquet Read/Write Performance

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink Parquet Read/Write Performance

Chan, Regina

Hi,

 

I’m trying to figure out why reading and writing ~5GB worth of parquet files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I’ve copied in the execution plan the taskmanager times below. Other details include that we’re reading 20 snappy compresed parquet files each ~240MB each. (see below)

 

I’m trying to use this for a milestoning logic where we take new avro files from staging and join with the existing milestoned parquet data. I have a small staging file with only about 1500 records inside so I reduce the number of records sent to the cogroup in order to make this faster. To do this, I’m basically reading in GenericRecords from parquet files twice, once to filter out for “live” records where we then further filter the records for ones with keys matching what we found in a separate avro file. This is so reduction of records makes that part of the plan total to 1 minute 58 secs.

 

The concern is the other records with non-live/not-matching-keys. In theory, I expect this to be fast since it’s just chaining the operations across all the way through to the sink. However, this part takes about 4 minutes. We’re not doing anything different from the other Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a bitmap value mapping to where the record needs to be written.

 

Other notes:

I checked the backpressure on the datasource->filter->map->map and it was OK. I’m not sure what else could be holding it up.

I also profiled it when I ran it on a single task manager single slot and it seems to spend most of the time waiting.

 

Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times inside each operation?

 

Data Source :

hdfs dfs -du -h <folder_name>

240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet

237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet

241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet

243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet

238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet

241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet

241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet

240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet

240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet

239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet

237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet

240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet

240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet

236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet

242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet

241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet

240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet

241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet

239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet

237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet

240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet

 

 

yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480 -s 2 -n 10  -d]

 

 

 

 

 

Thanks,

 

Regina