Sink Parallelism

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Sink Parallelism

Ravinder Kaur
Hello All,

Considering the following streaming dataflow of the example WordCount, I want to understand how Sink is parallelised.


Source --> flatMap --> groupBy(), sum() --> Sink


I want to understand how Sink is done parallelly and how the global result is distributed. 

As far as I understood groupBy(0) is applied to the tuples<String, Integer> emitted from the flatMap funtion, which groupes by the String value and sum(1) aggregates the Integer value getting the count. 

That means streams will be redistributed so that tuples grouped by the same String value be sent to one taskmanager and the Sink step should be writing the results to the specified path. When Sink step is also parallelised then each taskmanager should emit a chunk. These chunks put together must be the global result. 

But when I see the pictorial representation it seems that each task slot will run a copy of the streaming dataflow and will be performing the operations on the chunk of data it gets and outputs the result. But if this is the case the global result would have duplicates of strings and would be wrong. 

Could one of you kindly clarify what exactly happens?

Kind Regards,
Ravinder Kaur



Reply | Threaded
Open this post in threaded view
|

Re: Sink Parallelism

Chesnay Schepler
The picture you reference does not really show how dataflows are connected.
For a better picture, visit this link:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows

Let me know if this doesn't answer your question.

On 19.04.2016 14:22, Ravinder Kaur wrote:

> Hello All,
>
> Considering the following streaming dataflow of the example WordCount,
> I want to understand how Sink is parallelised.
>
>
> Source --> flatMap --> groupBy(), sum() --> Sink
>
> If I set the paralellism at runtime using -p, as shown here
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>
> I want to understand how Sink is done parallelly and how the global
> result is distributed.
>
> As far as I understood groupBy(0) is applied to the tuples<String,
> Integer> emitted from the flatMap funtion, which groupes by the String
> value and sum(1) aggregates the Integer value getting the count.
>
> That means streams will be redistributed so that tuples grouped by the
> same String value be sent to one taskmanager and the Sink step should
> be writing the results to the specified path. When Sink step is also
> parallelised then each taskmanager should emit a chunk. These chunks
> put together must be the global result.
>
> But when I see the pictorial representation it seems that each task
> slot will run a copy of the streaming dataflow and will be performing
> the operations on the chunk of data it gets and outputs the result.
> But if this is the case the global result would have duplicates of
> strings and would be wrong.
>
> Could one of you kindly clarify what exactly happens?
>
> Kind Regards,
> Ravinder Kaur
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Sink Parallelism

Ravinder Kaur
Hello Chesnay,

Thank you for the reply. According to this https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows if I set -p = 2 then sink will also have 2 Sink subtaks and the final result will end up in 2 stream partitions or say 2 chunks and combining them will be the global result of the WordCount of input Dataset. And when I say I have 2 taskmanagers with one taskslot each these 2 chunks are saved on 2 machines in the end.

I have attached an image of my understanding by working out an example WordCount with -p = 4. ​​Could you also explain how the communication among taskmanagers happen while redistributing streams and how tuples with same key end up in one taskmanager? Basically the implementation of groupBy on multiple taskmanagers.

Thanks,
Ravinder Kaur

On Tue, Apr 19, 2016 at 4:01 PM, Chesnay Schepler <[hidden email]> wrote:
The picture you reference does not really show how dataflows are connected.
For a better picture, visit this link: https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows

Let me know if this doesn't answer your question.


On 19.04.2016 14:22, Ravinder Kaur wrote:
Hello All,

Considering the following streaming dataflow of the example WordCount, I want to understand how Sink is parallelised.


Source --> flatMap --> groupBy(), sum() --> Sink

If I set the paralellism at runtime using -p, as shown here https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots

I want to understand how Sink is done parallelly and how the global result is distributed.

As far as I understood groupBy(0) is applied to the tuples<String, Integer> emitted from the flatMap funtion, which groupes by the String value and sum(1) aggregates the Integer value getting the count.

That means streams will be redistributed so that tuples grouped by the same String value be sent to one taskmanager and the Sink step should be writing the results to the specified path. When Sink step is also parallelised then each taskmanager should emit a chunk. These chunks put together must be the global result.

But when I see the pictorial representation it seems that each task slot will run a copy of the streaming dataflow and will be performing the operations on the chunk of data it gets and outputs the result. But if this is the case the global result would have duplicates of strings and would be wrong.

Could one of you kindly clarify what exactly happens?

Kind Regards,
Ravinder Kaur






20160419_165649[1].jpg (2M) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Sink Parallelism

Fabian Hueske-2
Hi Ravinder,

your drawing is pretty much correct (Flink will inject a combiner between flat map and reduce which locally combines records with the same key).
The partitioning between flat map and reduce is done with hash partitioning by default. However, you can also define a custom partitioner to control how records are distributed.

Best, Fabian

2016-04-19 17:04 GMT+02:00 Ravinder Kaur <[hidden email]>:
Hello Chesnay,

Thank you for the reply. According to this https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows if I set -p = 2 then sink will also have 2 Sink subtaks and the final result will end up in 2 stream partitions or say 2 chunks and combining them will be the global result of the WordCount of input Dataset. And when I say I have 2 taskmanagers with one taskslot each these 2 chunks are saved on 2 machines in the end.

I have attached an image of my understanding by working out an example WordCount with -p = 4. ​​Could you also explain how the communication among taskmanagers happen while redistributing streams and how tuples with same key end up in one taskmanager? Basically the implementation of groupBy on multiple taskmanagers.

Thanks,
Ravinder Kaur

On Tue, Apr 19, 2016 at 4:01 PM, Chesnay Schepler <[hidden email]> wrote:
The picture you reference does not really show how dataflows are connected.
For a better picture, visit this link: https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows

Let me know if this doesn't answer your question.


On 19.04.2016 14:22, Ravinder Kaur wrote:
Hello All,

Considering the following streaming dataflow of the example WordCount, I want to understand how Sink is parallelised.


Source --> flatMap --> groupBy(), sum() --> Sink

If I set the paralellism at runtime using -p, as shown here https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots

I want to understand how Sink is done parallelly and how the global result is distributed.

As far as I understood groupBy(0) is applied to the tuples<String, Integer> emitted from the flatMap funtion, which groupes by the String value and sum(1) aggregates the Integer value getting the count.

That means streams will be redistributed so that tuples grouped by the same String value be sent to one taskmanager and the Sink step should be writing the results to the specified path. When Sink step is also parallelised then each taskmanager should emit a chunk. These chunks put together must be the global result.

But when I see the pictorial representation it seems that each task slot will run a copy of the streaming dataflow and will be performing the operations on the chunk of data it gets and outputs the result. But if this is the case the global result would have duplicates of strings and would be wrong.

Could one of you kindly clarify what exactly happens?

Kind Regards,
Ravinder Kaur






Reply | Threaded
Open this post in threaded view
|

Re: Sink Parallelism

Ravinder Kaur
Hi Fabian,

Thank you for the explanation. Could you also explain how keyBy() would work? I assume it should work same as groupBy(), but in streaming mode since the data is unbounded all elements that arrive in the first window are grouped/partitioned by keys and aggregated and so on until no more streams left. The global result then has the aggregated key/value pairs.

Kind Regards,
Ravinder Kaur 



On Wed, Apr 20, 2016 at 12:12 PM, Fabian Hueske <[hidden email]> wrote:
Hi Ravinder,

your drawing is pretty much correct (Flink will inject a combiner between flat map and reduce which locally combines records with the same key).
The partitioning between flat map and reduce is done with hash partitioning by default. However, you can also define a custom partitioner to control how records are distributed.

Best, Fabian

2016-04-19 17:04 GMT+02:00 Ravinder Kaur <[hidden email]>:
Hello Chesnay,

Thank you for the reply. According to this https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows if I set -p = 2 then sink will also have 2 Sink subtaks and the final result will end up in 2 stream partitions or say 2 chunks and combining them will be the global result of the WordCount of input Dataset. And when I say I have 2 taskmanagers with one taskslot each these 2 chunks are saved on 2 machines in the end.

I have attached an image of my understanding by working out an example WordCount with -p = 4. ​​Could you also explain how the communication among taskmanagers happen while redistributing streams and how tuples with same key end up in one taskmanager? Basically the implementation of groupBy on multiple taskmanagers.

Thanks,
Ravinder Kaur

On Tue, Apr 19, 2016 at 4:01 PM, Chesnay Schepler <[hidden email]> wrote:
The picture you reference does not really show how dataflows are connected.
For a better picture, visit this link: https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows

Let me know if this doesn't answer your question.


On 19.04.2016 14:22, Ravinder Kaur wrote:
Hello All,

Considering the following streaming dataflow of the example WordCount, I want to understand how Sink is parallelised.


Source --> flatMap --> groupBy(), sum() --> Sink

If I set the paralellism at runtime using -p, as shown here https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots

I want to understand how Sink is done parallelly and how the global result is distributed.

As far as I understood groupBy(0) is applied to the tuples<String, Integer> emitted from the flatMap funtion, which groupes by the String value and sum(1) aggregates the Integer value getting the count.

That means streams will be redistributed so that tuples grouped by the same String value be sent to one taskmanager and the Sink step should be writing the results to the specified path. When Sink step is also parallelised then each taskmanager should emit a chunk. These chunks put together must be the global result.

But when I see the pictorial representation it seems that each task slot will run a copy of the streaming dataflow and will be performing the operations on the chunk of data it gets and outputs the result. But if this is the case the global result would have duplicates of strings and would be wrong.

Could one of you kindly clarify what exactly happens?

Kind Regards,
Ravinder Kaur







Reply | Threaded
Open this post in threaded view
|

Re: Sink Parallelism

Fabian Hueske-2
In batch / DataSet programs, groupBy() is execute by partitioning the data (usually hash partitioning) and sorting each partition to group all elements with the same key.
keyBy() in DataStream programs also partitions the data and results in a KeyedStream. The KeyedStream has information about the partitioning which is used for subsequent operations that require to hold state such as windows or other operators that use partitioned state. So keyBy() by itself if not grouping or aggregating data. It only partitions and preserves information about the partitioning which is used by following operators.

Best, Fabian

2016-04-20 14:56 GMT+02:00 Ravinder Kaur <[hidden email]>:
Hi Fabian,

Thank you for the explanation. Could you also explain how keyBy() would work? I assume it should work same as groupBy(), but in streaming mode since the data is unbounded all elements that arrive in the first window are grouped/partitioned by keys and aggregated and so on until no more streams left. The global result then has the aggregated key/value pairs.

Kind Regards,
Ravinder Kaur 



On Wed, Apr 20, 2016 at 12:12 PM, Fabian Hueske <[hidden email]> wrote:
Hi Ravinder,

your drawing is pretty much correct (Flink will inject a combiner between flat map and reduce which locally combines records with the same key).
The partitioning between flat map and reduce is done with hash partitioning by default. However, you can also define a custom partitioner to control how records are distributed.

Best, Fabian

2016-04-19 17:04 GMT+02:00 Ravinder Kaur <[hidden email]>:
Hello Chesnay,

Thank you for the reply. According to this https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows if I set -p = 2 then sink will also have 2 Sink subtaks and the final result will end up in 2 stream partitions or say 2 chunks and combining them will be the global result of the WordCount of input Dataset. And when I say I have 2 taskmanagers with one taskslot each these 2 chunks are saved on 2 machines in the end.

I have attached an image of my understanding by working out an example WordCount with -p = 4. ​​Could you also explain how the communication among taskmanagers happen while redistributing streams and how tuples with same key end up in one taskmanager? Basically the implementation of groupBy on multiple taskmanagers.

Thanks,
Ravinder Kaur

On Tue, Apr 19, 2016 at 4:01 PM, Chesnay Schepler <[hidden email]> wrote:
The picture you reference does not really show how dataflows are connected.
For a better picture, visit this link: https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows

Let me know if this doesn't answer your question.


On 19.04.2016 14:22, Ravinder Kaur wrote:
Hello All,

Considering the following streaming dataflow of the example WordCount, I want to understand how Sink is parallelised.


Source --> flatMap --> groupBy(), sum() --> Sink

If I set the paralellism at runtime using -p, as shown here https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots

I want to understand how Sink is done parallelly and how the global result is distributed.

As far as I understood groupBy(0) is applied to the tuples<String, Integer> emitted from the flatMap funtion, which groupes by the String value and sum(1) aggregates the Integer value getting the count.

That means streams will be redistributed so that tuples grouped by the same String value be sent to one taskmanager and the Sink step should be writing the results to the specified path. When Sink step is also parallelised then each taskmanager should emit a chunk. These chunks put together must be the global result.

But when I see the pictorial representation it seems that each task slot will run a copy of the streaming dataflow and will be performing the operations on the chunk of data it gets and outputs the result. But if this is the case the global result would have duplicates of strings and would be wrong.

Could one of you kindly clarify what exactly happens?

Kind Regards,
Ravinder Kaur