Flink Batch Performance degradation at scale

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Batch Performance degradation at scale

Garrett Barton
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Fabian Hueske-2
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Garrett Barton
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Fabian Hueske-2
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.



Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Garrett Barton
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So each slot is sitting with ~2737MB of usable space.  Would you have a different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.




Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Fabian Hueske-2
That doesn't look like a bad configuration.

I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed memory is allocated.
The actual size of the managed memory is logged in the TM log file during start up.

You could also try to decrease the number of slots per TM to 1 but add more vCores (yarn.containers.vcores []) because the sorter runs in multiple threads.

Adding a GroupCombineFunction for pre-aggregation (if possible...) would help to mitigate the effects of the data skew.
Another thing I'd like to ask: Are you adding the partitioner and sorter explicitly to the plan and if so why? Usually, the partitioning and sorting is done as part of the GroupReduce.

Best, Fabian

2017-12-06 23:32 GMT+01:00 Garrett Barton <[hidden email]>:
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So each slot is sitting with ~2737MB of usable space.  Would you have a different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.





Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Garrett Barton
Thanks for the reply again,

 I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matter what memory fraction I choose the sort instantly fails with SortMerger OOM exceptions.  Even when I set fraction to 0.95.  The data source part is ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping the other changes looks like to do the same behavior as before, jobs been running for ~20 minutes now.  Does Batch mode disable spilling to disk, or does batch with a combo of off heap disable spilling to disk?  Is there more documentation on what Batch mode does under the covers?

As for the flow itself, yes it used to be a lot smaller, I broke it out manually by adding the sort/partition to see which steps were causing me the slowdown, thinking it was my code, I wanted to separate the operations.

Thank you again for your help.

On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <[hidden email]> wrote:
That doesn't look like a bad configuration.

I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed memory is allocated.
The actual size of the managed memory is logged in the TM log file during start up.

You could also try to decrease the number of slots per TM to 1 but add more vCores (yarn.containers.vcores []) because the sorter runs in multiple threads.

Adding a GroupCombineFunction for pre-aggregation (if possible...) would help to mitigate the effects of the data skew.
Another thing I'd like to ask: Are you adding the partitioner and sorter explicitly to the plan and if so why? Usually, the partitioning and sorting is done as part of the GroupReduce.

Best, Fabian

2017-12-06 23:32 GMT+01:00 Garrett Barton <[hidden email]>:
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So each slot is sitting with ~2737MB of usable space.  Would you have a different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.






Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Fabian Hueske-2
Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
The managed memory should be divided among all possible consumers. In case of your simple job, this should just be Sorter.
In fact, I'd try to reduce the fraction to give more memory to the JVM heap (OOM means there was not enough (heap) memory).

Enabling BATCH mode means that the records are not shipped to the sorter in a pipelined fashion but buffered at (and written to the disk of) the sender task.
Once the input was consumed, the data is shipped to the receiver tasks (the sorter). This mode decouples tasks and also reduces the number of network buffers because fewer connection must be active at the same time.+
Here's a link to an internal design document (not sure how up to date it is though...) [1].

Did you try to check if the problem is cause by data skew?
You could add a MapPartition tasks instead of the PartitionSorter to count the number of records per partition.

Best, Fabian

2017-12-07 16:30 GMT+01:00 Garrett Barton <[hidden email]>:
Thanks for the reply again,

 I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matter what memory fraction I choose the sort instantly fails with SortMerger OOM exceptions.  Even when I set fraction to 0.95.  The data source part is ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping the other changes looks like to do the same behavior as before, jobs been running for ~20 minutes now.  Does Batch mode disable spilling to disk, or does batch with a combo of off heap disable spilling to disk?  Is there more documentation on what Batch mode does under the covers?

As for the flow itself, yes it used to be a lot smaller, I broke it out manually by adding the sort/partition to see which steps were causing me the slowdown, thinking it was my code, I wanted to separate the operations.

Thank you again for your help.

On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <[hidden email]> wrote:
That doesn't look like a bad configuration.

I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed memory is allocated.
The actual size of the managed memory is logged in the TM log file during start up.

You could also try to decrease the number of slots per TM to 1 but add more vCores (yarn.containers.vcores []) because the sorter runs in multiple threads.

Adding a GroupCombineFunction for pre-aggregation (if possible...) would help to mitigate the effects of the data skew.
Another thing I'd like to ask: Are you adding the partitioner and sorter explicitly to the plan and if so why? Usually, the partitioning and sorting is done as part of the GroupReduce.

Best, Fabian

2017-12-06 23:32 GMT+01:00 Garrett Barton <[hidden email]>:
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So each slot is sitting with ~2737MB of usable space.  Would you have a different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.







Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Garrett Barton
Stacktrace generates every time with the following settings (tried different memory fractions):
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7/0.3/0.1
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)

Hand Jammed top of the stack:
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getInterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:149)
... lots of netty stuffs


While I observe the taskmanagers I never see their JVM heaps get high at all.  Mind you I cant tell which task will blow and then see its TM in time to see what it looks like.  But each one I do look at the heap usage is ~150MB/6.16GB (with fraction: 0.1)

On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <[hidden email]> wrote:
Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
The managed memory should be divided among all possible consumers. In case of your simple job, this should just be Sorter.
In fact, I'd try to reduce the fraction to give more memory to the JVM heap (OOM means there was not enough (heap) memory).

Enabling BATCH mode means that the records are not shipped to the sorter in a pipelined fashion but buffered at (and written to the disk of) the sender task.
Once the input was consumed, the data is shipped to the receiver tasks (the sorter). This mode decouples tasks and also reduces the number of network buffers because fewer connection must be active at the same time.+
Here's a link to an internal design document (not sure how up to date it is though...) [1].

Did you try to check if the problem is cause by data skew?
You could add a MapPartition tasks instead of the PartitionSorter to count the number of records per partition.

Best, Fabian

2017-12-07 16:30 GMT+01:00 Garrett Barton <[hidden email]>:
Thanks for the reply again,

 I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matter what memory fraction I choose the sort instantly fails with SortMerger OOM exceptions.  Even when I set fraction to 0.95.  The data source part is ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping the other changes looks like to do the same behavior as before, jobs been running for ~20 minutes now.  Does Batch mode disable spilling to disk, or does batch with a combo of off heap disable spilling to disk?  Is there more documentation on what Batch mode does under the covers?

As for the flow itself, yes it used to be a lot smaller, I broke it out manually by adding the sort/partition to see which steps were causing me the slowdown, thinking it was my code, I wanted to separate the operations.

Thank you again for your help.

On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <[hidden email]> wrote:
That doesn't look like a bad configuration.

I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed memory is allocated.
The actual size of the managed memory is logged in the TM log file during start up.

You could also try to decrease the number of slots per TM to 1 but add more vCores (yarn.containers.vcores []) because the sorter runs in multiple threads.

Adding a GroupCombineFunction for pre-aggregation (if possible...) would help to mitigate the effects of the data skew.
Another thing I'd like to ask: Are you adding the partitioner and sorter explicitly to the plan and if so why? Usually, the partitioning and sorting is done as part of the GroupReduce.

Best, Fabian

2017-12-06 23:32 GMT+01:00 Garrett Barton <[hidden email]>:
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So each slot is sitting with ~2737MB of usable space.  Would you have a different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.








Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Fabian Hueske-2
Ah, no direct memory buffer...
Can you try to disable off-heap memory?

2017-12-07 18:35 GMT+01:00 Garrett Barton <[hidden email]>:
Stacktrace generates every time with the following settings (tried different memory fractions):
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7/0.3/0.1
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)

Hand Jammed top of the stack:
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getInterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:149)
... lots of netty stuffs


While I observe the taskmanagers I never see their JVM heaps get high at all.  Mind you I cant tell which task will blow and then see its TM in time to see what it looks like.  But each one I do look at the heap usage is ~150MB/6.16GB (with fraction: 0.1)

On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <[hidden email]> wrote:
Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
The managed memory should be divided among all possible consumers. In case of your simple job, this should just be Sorter.
In fact, I'd try to reduce the fraction to give more memory to the JVM heap (OOM means there was not enough (heap) memory).

Enabling BATCH mode means that the records are not shipped to the sorter in a pipelined fashion but buffered at (and written to the disk of) the sender task.
Once the input was consumed, the data is shipped to the receiver tasks (the sorter). This mode decouples tasks and also reduces the number of network buffers because fewer connection must be active at the same time.+
Here's a link to an internal design document (not sure how up to date it is though...) [1].

Did you try to check if the problem is cause by data skew?
You could add a MapPartition tasks instead of the PartitionSorter to count the number of records per partition.

Best, Fabian

2017-12-07 16:30 GMT+01:00 Garrett Barton <[hidden email]>:
Thanks for the reply again,

 I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matter what memory fraction I choose the sort instantly fails with SortMerger OOM exceptions.  Even when I set fraction to 0.95.  The data source part is ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping the other changes looks like to do the same behavior as before, jobs been running for ~20 minutes now.  Does Batch mode disable spilling to disk, or does batch with a combo of off heap disable spilling to disk?  Is there more documentation on what Batch mode does under the covers?

As for the flow itself, yes it used to be a lot smaller, I broke it out manually by adding the sort/partition to see which steps were causing me the slowdown, thinking it was my code, I wanted to separate the operations.

Thank you again for your help.

On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <[hidden email]> wrote:
That doesn't look like a bad configuration.

I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed memory is allocated.
The actual size of the managed memory is logged in the TM log file during start up.

You could also try to decrease the number of slots per TM to 1 but add more vCores (yarn.containers.vcores []) because the sorter runs in multiple threads.

Adding a GroupCombineFunction for pre-aggregation (if possible...) would help to mitigate the effects of the data skew.
Another thing I'd like to ask: Are you adding the partitioner and sorter explicitly to the plan and if so why? Usually, the partitioning and sorting is done as part of the GroupReduce.

Best, Fabian

2017-12-06 23:32 GMT+01:00 Garrett Barton <[hidden email]>:
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So each slot is sitting with ~2737MB of usable space.  Would you have a different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.









Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Garrett Barton
Running with these settings:
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: false
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)​

Looks like its running a little faster than the original settings, sort is not causing OOM at least.


​What do you mean by no direct memory buffer?  The taskmanagers look to report correct capacity under the Outside JVM section.​

Was googling around and ran into this: https://github.com/netty/netty/issues/6813  seemed promising but I dont see -XX:+DisableExplicitGC being added anywhere in the yarn launch_container.sh




On Thu, Dec 7, 2017 at 12:39 PM, Fabian Hueske <[hidden email]> wrote:
Ah, no direct memory buffer...
Can you try to disable off-heap memory?

2017-12-07 18:35 GMT+01:00 Garrett Barton <[hidden email]>:
Stacktrace generates every time with the following settings (tried different memory fractions):
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7/0.3/0.1
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)

Hand Jammed top of the stack:
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getInterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:149)
... lots of netty stuffs


While I observe the taskmanagers I never see their JVM heaps get high at all.  Mind you I cant tell which task will blow and then see its TM in time to see what it looks like.  But each one I do look at the heap usage is ~150MB/6.16GB (with fraction: 0.1)

On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <[hidden email]> wrote:
Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
The managed memory should be divided among all possible consumers. In case of your simple job, this should just be Sorter.
In fact, I'd try to reduce the fraction to give more memory to the JVM heap (OOM means there was not enough (heap) memory).

Enabling BATCH mode means that the records are not shipped to the sorter in a pipelined fashion but buffered at (and written to the disk of) the sender task.
Once the input was consumed, the data is shipped to the receiver tasks (the sorter). This mode decouples tasks and also reduces the number of network buffers because fewer connection must be active at the same time.+
Here's a link to an internal design document (not sure how up to date it is though...) [1].

Did you try to check if the problem is cause by data skew?
You could add a MapPartition tasks instead of the PartitionSorter to count the number of records per partition.

Best, Fabian

2017-12-07 16:30 GMT+01:00 Garrett Barton <[hidden email]>:
Thanks for the reply again,

 I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matter what memory fraction I choose the sort instantly fails with SortMerger OOM exceptions.  Even when I set fraction to 0.95.  The data source part is ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping the other changes looks like to do the same behavior as before, jobs been running for ~20 minutes now.  Does Batch mode disable spilling to disk, or does batch with a combo of off heap disable spilling to disk?  Is there more documentation on what Batch mode does under the covers?

As for the flow itself, yes it used to be a lot smaller, I broke it out manually by adding the sort/partition to see which steps were causing me the slowdown, thinking it was my code, I wanted to separate the operations.

Thank you again for your help.

On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <[hidden email]> wrote:
That doesn't look like a bad configuration.

I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed memory is allocated.
The actual size of the managed memory is logged in the TM log file during start up.

You could also try to decrease the number of slots per TM to 1 but add more vCores (yarn.containers.vcores []) because the sorter runs in multiple threads.

Adding a GroupCombineFunction for pre-aggregation (if possible...) would help to mitigate the effects of the data skew.
Another thing I'd like to ask: Are you adding the partitioner and sorter explicitly to the plan and if so why? Usually, the partitioning and sorting is done as part of the GroupReduce.

Best, Fabian

2017-12-06 23:32 GMT+01:00 Garrett Barton <[hidden email]>:
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So each slot is sitting with ~2737MB of usable space.  Would you have a different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.










Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Fabian Hueske-2
Oh, sorry. "Direct memory buffer" was the error message of the OOM.
Direct memory buffers are used when off-heap memory is enabled.

2017-12-07 18:56 GMT+01:00 Garrett Barton <[hidden email]>:
Running with these settings:
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: false
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)​

Looks like its running a little faster than the original settings, sort is not causing OOM at least.


​What do you mean by no direct memory buffer?  The taskmanagers look to report correct capacity under the Outside JVM section.​

Was googling around and ran into this: https://github.com/netty/netty/issues/6813  seemed promising but I dont see -XX:+DisableExplicitGC being added anywhere in the yarn launch_container.sh




On Thu, Dec 7, 2017 at 12:39 PM, Fabian Hueske <[hidden email]> wrote:
Ah, no direct memory buffer...
Can you try to disable off-heap memory?

2017-12-07 18:35 GMT+01:00 Garrett Barton <[hidden email]>:
Stacktrace generates every time with the following settings (tried different memory fractions):
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7/0.3/0.1
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)

Hand Jammed top of the stack:
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getInterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:149)
... lots of netty stuffs


While I observe the taskmanagers I never see their JVM heaps get high at all.  Mind you I cant tell which task will blow and then see its TM in time to see what it looks like.  But each one I do look at the heap usage is ~150MB/6.16GB (with fraction: 0.1)

On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <[hidden email]> wrote:
Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
The managed memory should be divided among all possible consumers. In case of your simple job, this should just be Sorter.
In fact, I'd try to reduce the fraction to give more memory to the JVM heap (OOM means there was not enough (heap) memory).

Enabling BATCH mode means that the records are not shipped to the sorter in a pipelined fashion but buffered at (and written to the disk of) the sender task.
Once the input was consumed, the data is shipped to the receiver tasks (the sorter). This mode decouples tasks and also reduces the number of network buffers because fewer connection must be active at the same time.+
Here's a link to an internal design document (not sure how up to date it is though...) [1].

Did you try to check if the problem is cause by data skew?
You could add a MapPartition tasks instead of the PartitionSorter to count the number of records per partition.

Best, Fabian

2017-12-07 16:30 GMT+01:00 Garrett Barton <[hidden email]>:
Thanks for the reply again,

 I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matter what memory fraction I choose the sort instantly fails with SortMerger OOM exceptions.  Even when I set fraction to 0.95.  The data source part is ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping the other changes looks like to do the same behavior as before, jobs been running for ~20 minutes now.  Does Batch mode disable spilling to disk, or does batch with a combo of off heap disable spilling to disk?  Is there more documentation on what Batch mode does under the covers?

As for the flow itself, yes it used to be a lot smaller, I broke it out manually by adding the sort/partition to see which steps were causing me the slowdown, thinking it was my code, I wanted to separate the operations.

Thank you again for your help.

On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <[hidden email]> wrote:
That doesn't look like a bad configuration.

I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed memory is allocated.
The actual size of the managed memory is logged in the TM log file during start up.

You could also try to decrease the number of slots per TM to 1 but add more vCores (yarn.containers.vcores []) because the sorter runs in multiple threads.

Adding a GroupCombineFunction for pre-aggregation (if possible...) would help to mitigate the effects of the data skew.
Another thing I'd like to ask: Are you adding the partitioner and sorter explicitly to the plan and if so why? Usually, the partitioning and sorting is done as part of the GroupReduce.

Best, Fabian

2017-12-06 23:32 GMT+01:00 Garrett Barton <[hidden email]>:
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So each slot is sitting with ~2737MB of usable space.  Would you have a different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the program:
- The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer. When the buffer is full, it is sorted and spilled to disk. When the buffer was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and off-heap memory (see configuration options [1]). You can also configure a fixed size for the managed memory. The more memory you configure, the more is available for sorting.

The managed memory of a TM is evenly distributed to all its processing slots. Hence, having more slots per TM means that each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase the number of TMs / memory as well), especially in case of data skew when most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution configuration to BATCH [2]. This will break the pipeline but write the result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many records.

The data sizes don't sound particular large, so this should be something that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not leverage them at the current state.

Best, Fabian

2017-12-06 16:45 GMT+01:00 Garrett Barton <[hidden email]>:
Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time.  Looking in the UI most slots get less than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together its about 272M records and these will run for hours at this point.  Looks like I need to figure out a different partitioning/sort strategy. I never noticed before because when I run the system at ~1400 slots I don't use the UI anymore as its gets unresponsive.  400 Slots is painfully slow, but still works.


The getEnv output is very cool! Also very big, I've tried to summarize it here in more of a yaml format as its on a different network.  Note the parallelism was just set to 10 as I didn't know if that effected output.  Hopefully I didn't flub a copy paste step, it looks good to me. 


​This flow used to be far fewer steps, but as it wasn't scaling I broke it out into all the distinct pieces so I could see where it failed.​  Source and sink are both Hive tables.  I wonder if the inputformat is expected to give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators.

It would be interesting to have a look at the execution plan for the program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton <[hidden email]>:
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.











Reply | Threaded
Open this post in threaded view
|

Re: Flink Batch Performance degradation at scale

Greg Hogan
In reply to this post by Garrett Barton
Hi Garrett,

In the Web UI, when viewing a job under overview / subtasks, selecting the checkbox "Aggregate task statistics by TaskManager” will reduce the number of displayed rows (though in your case only by half).

The following documents profiling a Flink job with Java Flight Recorder: 

Are your functions allocating Java collections? This is a common cause of poor performance. Also, Flink types are much faster than Kryo / GenericType.

A JobManager running hundreds of TaskManagers / TaskManager slots may require more than 5120 MB of heap. I’ve experienced very poor performance when this memory is too low. On the other hand, a TaskManager allocation of 9200 MB seems much too high for 2 slots when user functions are memory bound. If your data exceeds memory then it will be spilled to disk no matter how high the TM allocation so you are better off allowing the OS to manager the spilled data and prefetch.

The Gelly algorithms process trillions of records per hour on a system of your scale so Flink is certainly capable of achieving significantly better throughput.

Greg


On Dec 5, 2017, at 4:03 PM, Garrett Barton <[hidden email]> wrote:

I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful.  Things have largely worked great until I tried to really scale some of the jobs recently.

I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it.

What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move.  When I run the 12B set, the first 1B go through in under 1 minute, really really fast.  But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour.

What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets me a little further along.  I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.