Understanding blocking behavior

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding blocking behavior

Hailu, Andreas

Hi folks, I’m trying to get a better understanding of what operations result in blocked partitions. I’ve got a batch-processing job that reads from 2 sources, and then performs a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two different Sinks.

 

The kind of Sink a record in the DataSet is written to is dependent on the record’s properties, so we use a Map + Filter operation to just pull the desired records for the Sink. The latter portion of the graph looks like this:

 

DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X)

DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism P-X)

 

Parallelisms for the output into SinkA and SinkB are different than the parallelism used in the Map + Filter operation in order to control the resulting total number of output files. What I observe is that all of the records must first be sent to the Map + Filter operators, and only once after all records are received, the Sink begins to output records. This shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet may contain billions of records, this ends up taking hours. Ideally, the records are streamed through to the Sink as they go through the Map + Filter.

 

Is this blocking behavior due to the fact that the Map + Filter operators must re-distribute the records as they’re moving to an operator that has a lesser parallelism?

 

____________

 

Andreas Hailu

Data Lake Engineering | Goldman Sachs

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: Understanding blocking behavior

Jaffe, Julian

Hey Andreas,

 

Have a read through https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle and in particular the BATCH Execution Mode section. Your intuition is mostly correct – because your operators can’t be chained due to the rebalancing, if you execute your pipeline in batch mode downstream tasks will not begin processing data until the upstream tasks have finished all of their processing. If you can forgo the higher resiliency and lower resource requirements of executing in batch mode, you could try running your pipeline in streaming mode over bounded data.

 

Julian

 

From: "Hailu, Andreas [Engineering]" <[hidden email]>
Date: Tuesday, February 16, 2021 at 2:00 PM
To: "[hidden email]" <[hidden email]>
Subject: Understanding blocking behavior

 

Hi folks, I’m trying to get a better understanding of what operations result in blocked partitions. I’ve got a batch-processing job that reads from 2 sources, and then performs a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two different Sinks.

 

The kind of Sink a record in the DataSet is written to is dependent on the record’s properties, so we use a Map + Filter operation to just pull the desired records for the Sink. The latter portion of the graph looks like this:

 

DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X)

DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism P-X)

 

Parallelisms for the output into SinkA and SinkB are different than the parallelism used in the Map + Filter operation in order to control the resulting total number of output files. What I observe is that all of the records must first be sent to the Map + Filter operators, and only once after all records are received, the Sink begins to output records. This shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet may contain billions of records, this ends up taking hours. Ideally, the records are streamed through to the Sink as they go through the Map + Filter.

 

Is this blocking behavior due to the fact that the Map + Filter operators must re-distribute the records as they’re moving to an operator that has a lesser parallelism?

 

____________

 

Andreas Hailu

Data Lake Engineering | Goldman Sachs

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

Re: Understanding blocking behavior

Arvid Heise-4
Hi Andreas,

Julian already offered a good explanation, so here is one possible solution: you could try to run the whole first subpipeline with parallelism X and the second with P-X. However, most likely you need to run with P>X to finish in time.

Another way is to use DataStream (your program is not doing any aggregation/join, so streaming is indeed a good fit) and use STREAMING mode (the only execution mode for DataStream in Flink <1.12). There, all tasks are active all the time and records are streamed through as you expect. Since we plan to phase out DataSet API eventually, it's also the more future-proof solution.

Best,

Arvid

On Tue, Feb 16, 2021 at 11:37 PM Jaffe, Julian <[hidden email]> wrote:

Hey Andreas,

 

Have a read through https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle and in particular the BATCH Execution Mode section. Your intuition is mostly correct – because your operators can’t be chained due to the rebalancing, if you execute your pipeline in batch mode downstream tasks will not begin processing data until the upstream tasks have finished all of their processing. If you can forgo the higher resiliency and lower resource requirements of executing in batch mode, you could try running your pipeline in streaming mode over bounded data.

 

Julian

 

From: "Hailu, Andreas [Engineering]" <[hidden email]>
Date: Tuesday, February 16, 2021 at 2:00 PM
To: "[hidden email]" <[hidden email]>
Subject: Understanding blocking behavior

 

Hi folks, I’m trying to get a better understanding of what operations result in blocked partitions. I’ve got a batch-processing job that reads from 2 sources, and then performs a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two different Sinks.

 

The kind of Sink a record in the DataSet is written to is dependent on the record’s properties, so we use a Map + Filter operation to just pull the desired records for the Sink. The latter portion of the graph looks like this:

 

DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X)

DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism P-X)

 

Parallelisms for the output into SinkA and SinkB are different than the parallelism used in the Map + Filter operation in order to control the resulting total number of output files. What I observe is that all of the records must first be sent to the Map + Filter operators, and only once after all records are received, the Sink begins to output records. This shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet may contain billions of records, this ends up taking hours. Ideally, the records are streamed through to the Sink as they go through the Map + Filter.

 

Is this blocking behavior due to the fact that the Map + Filter operators must re-distribute the records as they’re moving to an operator that has a lesser parallelism?

 

____________

 

Andreas Hailu

Data Lake Engineering | Goldman Sachs

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices