Flink performance tuning on operators

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink performance tuning on operators

Ivan Yang
Hi,

We have a Flink job that reads data from an input stream, then converts each event from JSON string Avro object, finally writes to parquet files using StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a stateless job. Initially, we use one map operator to convert Json string to Avro object, Inside the map function, it goes form String -> JsonObject -> Avro object.

        DataStream<AvroSchema> avroData = data.map(new JsonToAVRO());

When we try to break the map operator to two, one for String to JsonObject, another for JsonObject to Avro.

        DataStream<JsonObject> JsonData = data.map(new StringToJson());
        DataStream<AvroSchema> avroData = rawDataAsJson.map(new JsonToAvroSchema())

The benchmark shows significant performance hit when breaking down to two operators. We try to understand the Flink internal on why such a big difference. The setup is using state backend = filesystem. Checkpoint = s3 bucket. Our event object has 300+ attributes.


Thanks
Ivan
Reply | Threaded
Open this post in threaded view
|

Re: Flink performance tuning on operators

Chesnay Schepler
Generally there should be no difference.
Can you check whether the maps are running as a chain (as a single task)?
If they are running in a chain, then I would suspect that something else is skewing your results.
If not, then the added network/serialization pressure would explain it.

I will assume that the mismatch in variable names in  your second example (JsonData vs rawDataAsJson) is just a typo.

On 15/05/2020 04:29, Ivan Yang wrote:
Hi,

We have a Flink job that reads data from an input stream, then converts each event from JSON string Avro object, finally writes to parquet files using StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a stateless job. Initially, we use one map operator to convert Json string to Avro object, Inside the map function, it goes form String -> JsonObject -> Avro object. 

	DataStream<AvroSchema> avroData = data.map(new JsonToAVRO());

When we try to break the map operator to two, one for String to JsonObject, another for JsonObject to Avro. 

        DataStream<JsonObject> JsonData = data.map(new StringToJson());
        DataStream<AvroSchema> avroData = rawDataAsJson.map(new JsonToAvroSchema())

The benchmark shows significant performance hit when breaking down to two operators. We try to understand the Flink internal on why such a big difference. The setup is using state backend = filesystem. Checkpoint = s3 bucket. Our event object has 300+ attributes.


Thanks
Ivan


Reply | Threaded
Open this post in threaded view
|

Re: Flink performance tuning on operators

Arvid Heise-3
Hi Ivan,

Just to add up to chaining: When splitting the map into two parts, objects need to be copied from one operator to the chained operator. Since your objects are very heavy that can take quite long, especially if you don't have a specific serializer configured but rely on Kryo.

You can avoid having the copying in your case by setting `enableObjectReuse` https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html .
This will directly pass results from upstream to chained downstream operators.

This feature is disabled by default because some users modify objects directly (non-functional style) or hold local, unmanaged state, so that always copying records is a conservative walkaround.

On Fri, May 15, 2020 at 10:17 AM Chesnay Schepler <[hidden email]> wrote:
Generally there should be no difference.
Can you check whether the maps are running as a chain (as a single task)?
If they are running in a chain, then I would suspect that something else is skewing your results.
If not, then the added network/serialization pressure would explain it.

I will assume that the mismatch in variable names in  your second example (JsonData vs rawDataAsJson) is just a typo.

On 15/05/2020 04:29, Ivan Yang wrote:
Hi,

We have a Flink job that reads data from an input stream, then converts each event from JSON string Avro object, finally writes to parquet files using StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a stateless job. Initially, we use one map operator to convert Json string to Avro object, Inside the map function, it goes form String -> JsonObject -> Avro object. 

	DataStream<AvroSchema> avroData = data.map(new JsonToAVRO());

When we try to break the map operator to two, one for String to JsonObject, another for JsonObject to Avro. 

        DataStream<JsonObject> JsonData = data.map(new StringToJson());
        DataStream<AvroSchema> avroData = rawDataAsJson.map(new JsonToAvroSchema())

The benchmark shows significant performance hit when breaking down to two operators. We try to understand the Flink internal on why such a big difference. The setup is using state backend = filesystem. Checkpoint = s3 bucket. Our event object has 300+ attributes.


Thanks
Ivan




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng