Strange filter performance with parquet

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Strange filter performance with parquet

Newport, Billy

We’re reading a parquet file (550m records).

 

We want to split the parquet using a filter in to 2 sets, live and dead.

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet dead = a.filter(deadFilter)

 

Is slower than

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet b = read parquet

DataSet dead = b.filter(deadFilter)

 

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2

 

 

Billy Newport

Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ

Tel:  +1 (212) 8557773 |  Cell:  +1 (507) 254-0134
Email: [hidden email], KD2DKQ

 

Reply | Threaded
Open this post in threaded view
|

Re: Strange filter performance with parquet

Fabian Hueske-2
Hi Billy,

this might depend on what you are doing with the live and dead DataSets later on.
For example, if you join both data sets, Flink might need to spill one of them to disk and read it back to avoid a deadlock.
This happens for instance if the join strategy is a HashJoin which blocks one input (known as probe side) until the other is consumed (build side).
If both join inputs originate from the same downstream input (Parquet) the downstream input cannot be blocked and the probe side needs to be consumed by spilling it to disk.
Spilling to disk and reading the result back might be more expensive than reading the original input twice.

You can also check the DataSet execution plan by calling getExecutionPlan() on the ExecutionEnvironment.

Best, Fabian

2017-02-07 21:10 GMT+01:00 Newport, Billy <[hidden email]>:

We’re reading a parquet file (550m records).

 

We want to split the parquet using a filter in to 2 sets, live and dead.

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet dead = a.filter(deadFilter)

 

Is slower than

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet b = read parquet

DataSet dead = b.filter(deadFilter)

 

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2

 

 

Billy Newport

Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ

Tel:  <a href="tel:(212)%20855-7773" value="+12128557773" target="_blank">+1 (212) 8557773 |  Cell:  <a href="tel:(507)%20254-0134" value="+15072540134" target="_blank">+1 (507) 254-0134
Email: [hidden email], KD2DKQ

 


Reply | Threaded
Open this post in threaded view
|

RE: Strange filter performance with parquet

Newport, Billy

It’s kind of like this:

 

DataSet live = from previous

DataSet newRecords = avro read

Dataset mergedLive = live.cogroup(newRecords)

Dataset result = mergedLive.union(deadRecords)

Store result to parquet.

 

BTW on another point,

Reading parquet files seems very slow to me. Writing is very fast in comparison. It takes 60 slots 10 minutes to read 550million records from a parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 cores so it’s very much slower than whats possible.

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:26 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hi Billy,

this might depend on what you are doing with the live and dead DataSets later on.

For example, if you join both data sets, Flink might need to spill one of them to disk and read it back to avoid a deadlock.

This happens for instance if the join strategy is a HashJoin which blocks one input (known as probe side) until the other is consumed (build side).

If both join inputs originate from the same downstream input (Parquet) the downstream input cannot be blocked and the probe side needs to be consumed by spilling it to disk.

Spilling to disk and reading the result back might be more expensive than reading the original input twice.

You can also check the DataSet execution plan by calling getExecutionPlan() on the ExecutionEnvironment.

Best, Fabian

 

2017-02-07 21:10 GMT+01:00 Newport, Billy <[hidden email]>:

We’re reading a parquet file (550m records).

 

We want to split the parquet using a filter in to 2 sets, live and dead.

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet dead = a.filter(deadFilter)

 

Is slower than

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet b = read parquet

DataSet dead = b.filter(deadFilter)

 

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2

 

 

Billy Newport

Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ

Tel:  <a href="tel:(212)%20855-7773" target="_blank"> +1 (212) 8557773 |  Cell:  <a href="tel:(507)%20254-0134" target="_blank">+1 (507) 254-0134
Email: [hidden email], KD2DKQ

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Strange filter performance with parquet

Fabian Hueske-2
Hmm, the plan you posted does not look like it would need to spill data to avoid a deadlock.
Not sure what's causing the slowdown.

How do you read Parquet files?
If you use the HadoopIF wrapper, this might add some overhead.
A dedicated Flink InputFormat for Parquet might help here.

2017-02-07 21:32 GMT+01:00 Newport, Billy <[hidden email]>:

It’s kind of like this:

 

DataSet live = from previous

DataSet newRecords = avro read

Dataset mergedLive = live.cogroup(newRecords)

Dataset result = mergedLive.union(deadRecords)

Store result to parquet.

 

BTW on another point,

Reading parquet files seems very slow to me. Writing is very fast in comparison. It takes 60 slots 10 minutes to read 550million records from a parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 cores so it’s very much slower than whats possible.

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:26 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hi Billy,

this might depend on what you are doing with the live and dead DataSets later on.

For example, if you join both data sets, Flink might need to spill one of them to disk and read it back to avoid a deadlock.

This happens for instance if the join strategy is a HashJoin which blocks one input (known as probe side) until the other is consumed (build side).

If both join inputs originate from the same downstream input (Parquet) the downstream input cannot be blocked and the probe side needs to be consumed by spilling it to disk.

Spilling to disk and reading the result back might be more expensive than reading the original input twice.

You can also check the DataSet execution plan by calling getExecutionPlan() on the ExecutionEnvironment.

Best, Fabian

 

2017-02-07 21:10 GMT+01:00 Newport, Billy <[hidden email]>:

We’re reading a parquet file (550m records).

 

We want to split the parquet using a filter in to 2 sets, live and dead.

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet dead = a.filter(deadFilter)

 

Is slower than

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet b = read parquet

DataSet dead = b.filter(deadFilter)

 

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2

 

 

Billy Newport

Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ

Tel:  <a href="tel:(212)%20855-7773" target="_blank"> +1 (212) 8557773 |  Cell:  <a href="tel:(507)%20254-0134" target="_blank">+1 (507) 254-0134
Email: [hidden email], KD2DKQ

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Strange filter performance with parquet

Newport, Billy

We read them like this:

 

                     Job job = Job.getInstance();

 

                     AvroParquetInputFormat<GenericRecord> inputFormat = new AvroParquetInputFormat<GenericRecord>();

                     AvroParquetInputFormat.setAvroReadSchema(job, getOutputSchema(datasetName));

 

                     String storeName = getCurrentRefinerInfo().getDatastoreName();

                     Schema schema = getMergeSchema(storeName, datasetName);

                     SerializableAvroRecordBuilder mergeRecordBuilder = new SerializableAvroRecordBuilder(storeName, datasetName, schema);

 

                     Path path = new Path(datasetMergeDir);

                     DataSet<Tuple2<Void, GenericRecord>> d = getExecutionEnvironment().readHadoopFile(inputFormat, Void.class, GenericRecord.class, path.toString(), job).filter(new FilterFunction<Tuple2<Void,GenericRecord>>() { this does the live/dead filtering…

 

 

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:56 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hmm, the plan you posted does not look like it would need to spill data to avoid a deadlock.

Not sure what's causing the slowdown.

 

How do you read Parquet files?
If you use the HadoopIF wrapper, this might add some overhead.

A dedicated Flink InputFormat for Parquet might help here.

 

2017-02-07 21:32 GMT+01:00 Newport, Billy <[hidden email]>:

It’s kind of like this:

 

DataSet live = from previous

DataSet newRecords = avro read

Dataset mergedLive = live.cogroup(newRecords)

Dataset result = mergedLive.union(deadRecords)

Store result to parquet.

 

BTW on another point,

Reading parquet files seems very slow to me. Writing is very fast in comparison. It takes 60 slots 10 minutes to read 550million records from a parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 cores so it’s very much slower than whats possible.

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:26 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hi Billy,

this might depend on what you are doing with the live and dead DataSets later on.

For example, if you join both data sets, Flink might need to spill one of them to disk and read it back to avoid a deadlock.

This happens for instance if the join strategy is a HashJoin which blocks one input (known as probe side) until the other is consumed (build side).

If both join inputs originate from the same downstream input (Parquet) the downstream input cannot be blocked and the probe side needs to be consumed by spilling it to disk.

Spilling to disk and reading the result back might be more expensive than reading the original input twice.

You can also check the DataSet execution plan by calling getExecutionPlan() on the ExecutionEnvironment.

Best, Fabian

 

2017-02-07 21:10 GMT+01:00 Newport, Billy <[hidden email]>:

We’re reading a parquet file (550m records).

 

We want to split the parquet using a filter in to 2 sets, live and dead.

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet dead = a.filter(deadFilter)

 

Is slower than

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet b = read parquet

DataSet dead = b.filter(deadFilter)

 

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2

 

 

Billy Newport

Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ

Tel:  <a href="tel:(212)%20855-7773" target="_blank"> +1 (212) 8557773 |  Cell:  <a href="tel:(507)%20254-0134" target="_blank">+1 (507) 254-0134
Email: [hidden email], KD2DKQ

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Strange filter performance with parquet

Fabian Hueske-2
I'm not familiar with the details of Parquet and Avro, but I know that the handling of GenericRecord is very inefficient in Flink.
The reason is that they are serialized using Kryo and always contain the full Avro schema. If you can provide a specific record to the InputFormat, Flink will serialize it much more efficiently as a Pojo type.

2017-02-07 22:08 GMT+01:00 Newport, Billy <[hidden email]>:

We read them like this:

 

                     Job job = Job.getInstance();

 

                     AvroParquetInputFormat<GenericRecord> inputFormat = new AvroParquetInputFormat<GenericRecord>();

                     AvroParquetInputFormat.setAvroReadSchema(job, getOutputSchema(datasetName));

 

                     String storeName = getCurrentRefinerInfo().getDatastoreName();

                     Schema schema = getMergeSchema(storeName, datasetName);

                     SerializableAvroRecordBuilder mergeRecordBuilder = new SerializableAvroRecordBuilder(storeName, datasetName, schema);

 

                     Path path = new Path(datasetMergeDir);

                     DataSet<Tuple2<Void, GenericRecord>> d = getExecutionEnvironment().readHadoopFile(inputFormat, Void.class, GenericRecord.class, path.toString(), job).filter(new FilterFunction<Tuple2<Void,GenericRecord>>() { this does the live/dead filtering…

 

 

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:56 PM


To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hmm, the plan you posted does not look like it would need to spill data to avoid a deadlock.

Not sure what's causing the slowdown.

 

How do you read Parquet files?
If you use the HadoopIF wrapper, this might add some overhead.

A dedicated Flink InputFormat for Parquet might help here.

 

2017-02-07 21:32 GMT+01:00 Newport, Billy <[hidden email]>:

It’s kind of like this:

 

DataSet live = from previous

DataSet newRecords = avro read

Dataset mergedLive = live.cogroup(newRecords)

Dataset result = mergedLive.union(deadRecords)

Store result to parquet.

 

BTW on another point,

Reading parquet files seems very slow to me. Writing is very fast in comparison. It takes 60 slots 10 minutes to read 550million records from a parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 cores so it’s very much slower than whats possible.

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:26 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hi Billy,

this might depend on what you are doing with the live and dead DataSets later on.

For example, if you join both data sets, Flink might need to spill one of them to disk and read it back to avoid a deadlock.

This happens for instance if the join strategy is a HashJoin which blocks one input (known as probe side) until the other is consumed (build side).

If both join inputs originate from the same downstream input (Parquet) the downstream input cannot be blocked and the probe side needs to be consumed by spilling it to disk.

Spilling to disk and reading the result back might be more expensive than reading the original input twice.

You can also check the DataSet execution plan by calling getExecutionPlan() on the ExecutionEnvironment.

Best, Fabian

 

2017-02-07 21:10 GMT+01:00 Newport, Billy <[hidden email]>:

We’re reading a parquet file (550m records).

 

We want to split the parquet using a filter in to 2 sets, live and dead.

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet dead = a.filter(deadFilter)

 

Is slower than

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet b = read parquet

DataSet dead = b.filter(deadFilter)

 

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2

 

 

Billy Newport

Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ

Tel:  <a href="tel:(212)%20855-7773" target="_blank"> +1 (212) 8557773 |  Cell:  <a href="tel:(507)%20254-0134" target="_blank">+1 (507) 254-0134
Email: [hidden email], KD2DKQ

 

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Strange filter performance with parquet

Newport, Billy

Clicked send to early, the serializer we wrote also caches datumreader/writers for each known schema and serializes the genericrecords using the avro encoder/decoder so it’s not slow but not as fast as custom ones.

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 4:17 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

I'm not familiar with the details of Parquet and Avro, but I know that the handling of GenericRecord is very inefficient in Flink.

The reason is that they are serialized using Kryo and always contain the full Avro schema. If you can provide a specific record to the InputFormat, Flink will serialize it much more efficiently as a Pojo type.

 

2017-02-07 22:08 GMT+01:00 Newport, Billy <[hidden email]>:

We read them like this:

 

                     Job job = Job.getInstance();

 

                     AvroParquetInputFormat<GenericRecord> inputFormat = new AvroParquetInputFormat<GenericRecord>();

                     AvroParquetInputFormat.setAvroReadSchema(job, getOutputSchema(datasetName));

 

                     String storeName = getCurrentRefinerInfo().getDatastoreName();

                     Schema schema = getMergeSchema(storeName, datasetName);

                     SerializableAvroRecordBuilder mergeRecordBuilder = new SerializableAvroRecordBuilder(storeName, datasetName, schema);

 

                     Path path = new Path(datasetMergeDir);

                     DataSet<Tuple2<Void, GenericRecord>> d = getExecutionEnvironment().readHadoopFile(inputFormat, Void.class, GenericRecord.class, path.toString(), job).filter(new FilterFunction<Tuple2<Void,GenericRecord>>() { this does the live/dead filtering…

 

 

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:56 PM


To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hmm, the plan you posted does not look like it would need to spill data to avoid a deadlock.

Not sure what's causing the slowdown.

 

How do you read Parquet files?
If you use the HadoopIF wrapper, this might add some overhead.

A dedicated Flink InputFormat for Parquet might help here.

 

2017-02-07 21:32 GMT+01:00 Newport, Billy <[hidden email]>:

It’s kind of like this:

 

DataSet live = from previous

DataSet newRecords = avro read

Dataset mergedLive = live.cogroup(newRecords)

Dataset result = mergedLive.union(deadRecords)

Store result to parquet.

 

BTW on another point,

Reading parquet files seems very slow to me. Writing is very fast in comparison. It takes 60 slots 10 minutes to read 550million records from a parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 cores so it’s very much slower than whats possible.

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:26 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hi Billy,

this might depend on what you are doing with the live and dead DataSets later on.

For example, if you join both data sets, Flink might need to spill one of them to disk and read it back to avoid a deadlock.

This happens for instance if the join strategy is a HashJoin which blocks one input (known as probe side) until the other is consumed (build side).

If both join inputs originate from the same downstream input (Parquet) the downstream input cannot be blocked and the probe side needs to be consumed by spilling it to disk.

Spilling to disk and reading the result back might be more expensive than reading the original input twice.

You can also check the DataSet execution plan by calling getExecutionPlan() on the ExecutionEnvironment.

Best, Fabian

 

2017-02-07 21:10 GMT+01:00 Newport, Billy <[hidden email]>:

We’re reading a parquet file (550m records).

 

We want to split the parquet using a filter in to 2 sets, live and dead.

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet dead = a.filter(deadFilter)

 

Is slower than

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet b = read parquet

DataSet dead = b.filter(deadFilter)

 

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2

 

 

Billy Newport

Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ

Tel:  <a href="tel:(212)%20855-7773" target="_blank"> +1 (212) 8557773 |  Cell:  <a href="tel:(507)%20254-0134" target="_blank">+1 (507) 254-0134
Email: [hidden email], KD2DKQ

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Strange filter performance with parquet

Newport, Billy
In reply to this post by Fabian Hueske-2

We’ve fixed that, we wrote a custom kryo serializer which just writes a long fingerprint and we have the usual bi-directional schema map. This reduces the memory usage according to flink metrics by 20x. It does require us to ship the schema info around with the DFG so that that cache is primed before records are read/write in that JVM.

 

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 4:17 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

I'm not familiar with the details of Parquet and Avro, but I know that the handling of GenericRecord is very inefficient in Flink.

The reason is that they are serialized using Kryo and always contain the full Avro schema. If you can provide a specific record to the InputFormat, Flink will serialize it much more efficiently as a Pojo type.

 

2017-02-07 22:08 GMT+01:00 Newport, Billy <[hidden email]>:

We read them like this:

 

                     Job job = Job.getInstance();

 

                     AvroParquetInputFormat<GenericRecord> inputFormat = new AvroParquetInputFormat<GenericRecord>();

                     AvroParquetInputFormat.setAvroReadSchema(job, getOutputSchema(datasetName));

 

                     String storeName = getCurrentRefinerInfo().getDatastoreName();

                     Schema schema = getMergeSchema(storeName, datasetName);

                     SerializableAvroRecordBuilder mergeRecordBuilder = new SerializableAvroRecordBuilder(storeName, datasetName, schema);

 

                     Path path = new Path(datasetMergeDir);

                     DataSet<Tuple2<Void, GenericRecord>> d = getExecutionEnvironment().readHadoopFile(inputFormat, Void.class, GenericRecord.class, path.toString(), job).filter(new FilterFunction<Tuple2<Void,GenericRecord>>() { this does the live/dead filtering…

 

 

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:56 PM


To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hmm, the plan you posted does not look like it would need to spill data to avoid a deadlock.

Not sure what's causing the slowdown.

 

How do you read Parquet files?
If you use the HadoopIF wrapper, this might add some overhead.

A dedicated Flink InputFormat for Parquet might help here.

 

2017-02-07 21:32 GMT+01:00 Newport, Billy <[hidden email]>:

It’s kind of like this:

 

DataSet live = from previous

DataSet newRecords = avro read

Dataset mergedLive = live.cogroup(newRecords)

Dataset result = mergedLive.union(deadRecords)

Store result to parquet.

 

BTW on another point,

Reading parquet files seems very slow to me. Writing is very fast in comparison. It takes 60 slots 10 minutes to read 550million records from a parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 cores so it’s very much slower than whats possible.

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 07, 2017 3:26 PM
To: [hidden email]
Subject: Re: Strange filter performance with parquet

 

Hi Billy,

this might depend on what you are doing with the live and dead DataSets later on.

For example, if you join both data sets, Flink might need to spill one of them to disk and read it back to avoid a deadlock.

This happens for instance if the join strategy is a HashJoin which blocks one input (known as probe side) until the other is consumed (build side).

If both join inputs originate from the same downstream input (Parquet) the downstream input cannot be blocked and the probe side needs to be consumed by spilling it to disk.

Spilling to disk and reading the result back might be more expensive than reading the original input twice.

You can also check the DataSet execution plan by calling getExecutionPlan() on the ExecutionEnvironment.

Best, Fabian

 

2017-02-07 21:10 GMT+01:00 Newport, Billy <[hidden email]>:

We’re reading a parquet file (550m records).

 

We want to split the parquet using a filter in to 2 sets, live and dead.

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet dead = a.filter(deadFilter)

 

Is slower than

 

DataSet a = read parquet

DataSet live = a.filter(liveFilter)

DataSet b = read parquet

DataSet dead = b.filter(deadFilter)

 

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2

 

 

Billy Newport

Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ

Tel:  <a href="tel:(212)%20855-7773" target="_blank"> +1 (212) 8557773 |  Cell:  <a href="tel:(507)%20254-0134" target="_blank">+1 (507) 254-0134
Email: [hidden email], KD2DKQ