Hi folks, I’m trying to get a better understanding of what operations result in blocked partitions. I’ve got a batch-processing job that reads from 2 sources, and then performs
a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two different Sinks. The kind of Sink a record in the DataSet is written to is dependent on the record’s properties, so we use a Map + Filter operation to just pull the desired records for the Sink.
The latter portion of the graph looks like this: DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X) DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism P-X) Parallelisms for the output into SinkA and SinkB are different than the parallelism used in the Map + Filter operation in order to control the resulting total number of output
files. What I observe is that all of the records must first be sent to the Map + Filter operators, and only once after all records are received, the Sink begins to output records. This shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states
while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet may contain billions of records, this ends up taking hours. Ideally, the records are streamed through to the Sink as they go through the Map + Filter. Is this blocking behavior due to the fact that the Map + Filter operators must re-distribute the records as they’re moving to an operator that has a lesser parallelism? ____________ Andreas Hailu Data Lake Engineering
|
Goldman Sachs
Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices |
Hey Andreas, Have a read through
https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle and in particular the BATCH Execution Mode section. Your intuition is mostly correct – because your operators can’t be chained
due to the rebalancing, if you execute your pipeline in batch mode downstream tasks will not begin processing data until the upstream tasks have finished all of their processing. If you can forgo the higher resiliency and lower resource requirements of executing
in batch mode, you could try running your pipeline in streaming mode over bounded data. Julian From: "Hailu, Andreas [Engineering]" <[hidden email]> Hi folks, I’m trying to get a better understanding of what operations result in blocked partitions. I’ve got a batch-processing job that reads from 2 sources, and then performs
a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two different Sinks. The kind of Sink a record in the DataSet is written to is dependent on the record’s properties, so we use a Map + Filter operation to just pull the desired records for the Sink.
The latter portion of the graph looks like this: DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X) DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism P-X) Parallelisms for the output into SinkA and SinkB are different than the parallelism used in the Map + Filter operation in order to control the resulting total number of output
files. What I observe is that all of the records must first be sent to the Map + Filter operators, and only once after all records are received, the Sink begins to output records. This shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states
while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet may contain billions of records, this ends up taking hours. Ideally, the records are streamed through to the Sink as they go through the Map + Filter. Is this blocking behavior due to the fact that the Map + Filter operators must re-distribute the records as they’re moving to an operator that has a lesser parallelism? ____________ Andreas Hailu Data Lake Engineering
|
Goldman Sachs
|
Hi Andreas, Julian already offered a good explanation, so here is one possible solution: you could try to run the whole first subpipeline with parallelism X and the second with P-X. However, most likely you need to run with P>X to finish in time. Another way is to use DataStream (your program is not doing any aggregation/join, so streaming is indeed a good fit) and use STREAMING mode (the only execution mode for DataStream in Flink <1.12). There, all tasks are active all the time and records are streamed through as you expect. Since we plan to phase out DataSet API eventually, it's also the more future-proof solution. Best, Arvid On Tue, Feb 16, 2021 at 11:37 PM Jaffe, Julian <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |