Hi,
We have a Flink job that reads data from an input stream, then converts each event from JSON string Avro object, finally writes to parquet files using StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a stateless job. Initially, we use one map operator to convert Json string to Avro object, Inside the map function, it goes form String -> JsonObject -> Avro object. DataStream<AvroSchema> avroData = data.map(new JsonToAVRO()); When we try to break the map operator to two, one for String to JsonObject, another for JsonObject to Avro. DataStream<JsonObject> JsonData = data.map(new StringToJson()); DataStream<AvroSchema> avroData = rawDataAsJson.map(new JsonToAvroSchema()) The benchmark shows significant performance hit when breaking down to two operators. We try to understand the Flink internal on why such a big difference. The setup is using state backend = filesystem. Checkpoint = s3 bucket. Our event object has 300+ attributes. Thanks Ivan |
Generally there should be no
difference.
Can you check whether the maps are
running as a chain (as a single task)?
If they are running in a chain, then I
would suspect that something else is skewing your results.
If not, then the added
network/serialization pressure would explain it.
I will assume that the mismatch in
variable names in your second example (JsonData vs rawDataAsJson)
is just a typo.
On 15/05/2020 04:29, Ivan Yang wrote:
Hi, We have a Flink job that reads data from an input stream, then converts each event from JSON string Avro object, finally writes to parquet files using StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a stateless job. Initially, we use one map operator to convert Json string to Avro object, Inside the map function, it goes form String -> JsonObject -> Avro object. DataStream<AvroSchema> avroData = data.map(new JsonToAVRO()); When we try to break the map operator to two, one for String to JsonObject, another for JsonObject to Avro. DataStream<JsonObject> JsonData = data.map(new StringToJson()); DataStream<AvroSchema> avroData = rawDataAsJson.map(new JsonToAvroSchema()) The benchmark shows significant performance hit when breaking down to two operators. We try to understand the Flink internal on why such a big difference. The setup is using state backend = filesystem. Checkpoint = s3 bucket. Our event object has 300+ attributes. Thanks Ivan
|
Hi Ivan, Just to add up to chaining: When splitting the map into two parts, objects need to be copied from one operator to the chained operator. Since your objects are very heavy that can take quite long, especially if you don't have a specific serializer configured but rely on Kryo. You can avoid having the copying in your case by setting `enableObjectReuse` https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html . This will directly pass results from upstream to chained downstream operators. This feature is disabled by default because some users modify objects directly (non-functional style) or hold local, unmanaged state, so that always copying records is a conservative walkaround. On Fri, May 15, 2020 at 10:17 AM Chesnay Schepler <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |