Dynamic partitioner for Flink based on incoming load

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic partitioner for Flink based on incoming load

Alexander Filipchik

Hello!

We are working an a Flink Streaming job that reads data from multiple Kafka topics and writes them to DFS. We are using StreamingFileSink with custom implementation for GCS FS and it generates a lot of files as streams are partitioned among multiple JMs. In the ideal case we should have at most 1 file per kafka topic per interval. We also have some heavy topics and some pretty light ones, so the solution should also be smart to utilize resources efficiently.

I was thinking we can partition based on how much data is ingested in the last minute or so to make sure: messages from the same topic are routed to the same (or minimal number of ) file if there are enough resources to do so. Think bin packing.

Is it a good idea? Is there a built in way to achieve it? If not, is there a way to push state into the partitioner (or even kafka client to repartition in the source)? I was thinking that I can run a side stream that will calculate data volumes and then broadcast it into the main stream, so partitioner can make a decision, but it feels a bit complex.

Another way is to modify kafka client to track messages per topics and make decision at that layer.

Am I on the right path?

Thank you

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioner for Flink based on incoming load

Seth Wiesman
You can achieve this in Flink 1.10 using the StreamingFileSink.

I’d also like to note that Flink 1.11 (which is currently going through release testing and should be available imminently) has support for exactly this functionality in the table API.


On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <[hidden email]> wrote:

Hello!

We are working an a Flink Streaming job that reads data from multiple Kafka topics and writes them to DFS. We are using StreamingFileSink with custom implementation for GCS FS and it generates a lot of files as streams are partitioned among multiple JMs. In the ideal case we should have at most 1 file per kafka topic per interval. We also have some heavy topics and some pretty light ones, so the solution should also be smart to utilize resources efficiently.

I was thinking we can partition based on how much data is ingested in the last minute or so to make sure: messages from the same topic are routed to the same (or minimal number of ) file if there are enough resources to do so. Think bin packing.

Is it a good idea? Is there a built in way to achieve it? If not, is there a way to push state into the partitioner (or even kafka client to repartition in the source)? I was thinking that I can run a side stream that will calculate data volumes and then broadcast it into the main stream, so partitioner can make a decision, but it feels a bit complex.

Another way is to modify kafka client to track messages per topics and make decision at that layer.

Am I on the right path?

Thank you

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioner for Flink based on incoming load

Alexander Filipchik
Maybe I misreading the documentation, but:
"Data within the partition directories are split into part files. Each partition will contain at least one part file for each subtask of the sink that has received data for that partition."

So, it is 1 partition per subtask. I'm trying to figure out how to dynamically adjust which subtask is getting the data to minimize the number of subtasks writing into a specific partition.

Alex

On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <[hidden email]> wrote:
You can achieve this in Flink 1.10 using the StreamingFileSink.

I’d also like to note that Flink 1.11 (which is currently going through release testing and should be available imminently) has support for exactly this functionality in the table API.


On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <[hidden email]> wrote:

Hello!

We are working an a Flink Streaming job that reads data from multiple Kafka topics and writes them to DFS. We are using StreamingFileSink with custom implementation for GCS FS and it generates a lot of files as streams are partitioned among multiple JMs. In the ideal case we should have at most 1 file per kafka topic per interval. We also have some heavy topics and some pretty light ones, so the solution should also be smart to utilize resources efficiently.

I was thinking we can partition based on how much data is ingested in the last minute or so to make sure: messages from the same topic are routed to the same (or minimal number of ) file if there are enough resources to do so. Think bin packing.

Is it a good idea? Is there a built in way to achieve it? If not, is there a way to push state into the partitioner (or even kafka client to repartition in the source)? I was thinking that I can run a side stream that will calculate data volumes and then broadcast it into the main stream, so partitioner can make a decision, but it feels a bit complex.

Another way is to modify kafka client to track messages per topics and make decision at that layer.

Am I on the right path?

Thank you

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioner for Flink based on incoming load

Kostas Kloudas-2
Hi Alexander,

Routing of input data in Flink can be done through keying and this can
guarantee collocation constraints. This means that you can send two
records to the same node by giving them the same key, e.g. the topic
name. Keep in mind that elements with different keys do not
necessarily go to different nodes, as key assignment to nodes is
random.

Given this, you could initially key by topic, so that all records of a
topic go to the same node. This node will compute statistics about the
topic, e.g. elem/sec (t) and based on thresholds assign new keys to
each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
will go to different machines but the probability of this happening
will increase with the parallelism of your job. Finally, based on your
bucket assigner and the rolling policy, you can redirect the elements
to the same bucket, e.g. TOPIC and tune how many part-files you will
have based on part-file size and/or time.

Will this help you with your use-case?

Cheers,
Kostas




On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
<[hidden email]> wrote:

>
> Maybe I misreading the documentation, but:
> "Data within the partition directories are split into part files. Each partition will contain at least one part file for each subtask of the sink that has received data for that partition."
>
> So, it is 1 partition per subtask. I'm trying to figure out how to dynamically adjust which subtask is getting the data to minimize the number of subtasks writing into a specific partition.
>
> Alex
>
> On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <[hidden email]> wrote:
>>
>> You can achieve this in Flink 1.10 using the StreamingFileSink.
>>
>> I’d also like to note that Flink 1.11 (which is currently going through release testing and should be available imminently) has support for exactly this functionality in the table API.
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
>>
>>
>> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <[hidden email]> wrote:
>>>
>>> Hello!
>>>
>>> We are working an a Flink Streaming job that reads data from multiple Kafka topics and writes them to DFS. We are using StreamingFileSink with custom implementation for GCS FS and it generates a lot of files as streams are partitioned among multiple JMs. In the ideal case we should have at most 1 file per kafka topic per interval. We also have some heavy topics and some pretty light ones, so the solution should also be smart to utilize resources efficiently.
>>>
>>> I was thinking we can partition based on how much data is ingested in the last minute or so to make sure: messages from the same topic are routed to the same (or minimal number of ) file if there are enough resources to do so. Think bin packing.
>>>
>>> Is it a good idea? Is there a built in way to achieve it? If not, is there a way to push state into the partitioner (or even kafka client to repartition in the source)? I was thinking that I can run a side stream that will calculate data volumes and then broadcast it into the main stream, so partitioner can make a decision, but it feels a bit complex.
>>>
>>> Another way is to modify kafka client to track messages per topics and make decision at that layer.
>>>
>>> Am I on the right path?
>>>
>>> Thank you
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioner for Flink based on incoming load

Alexander Filipchik
This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too much data? Is there a way to avoid shuffle at all (or do only 1) and avoid a situation when 1 node will become a hotspot?

Alex

On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas <[hidden email]> wrote:
Hi Alexander,

Routing of input data in Flink can be done through keying and this can
guarantee collocation constraints. This means that you can send two
records to the same node by giving them the same key, e.g. the topic
name. Keep in mind that elements with different keys do not
necessarily go to different nodes, as key assignment to nodes is
random.

Given this, you could initially key by topic, so that all records of a
topic go to the same node. This node will compute statistics about the
topic, e.g. elem/sec (t) and based on thresholds assign new keys to
each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
will go to different machines but the probability of this happening
will increase with the parallelism of your job. Finally, based on your
bucket assigner and the rolling policy, you can redirect the elements
to the same bucket, e.g. TOPIC and tune how many part-files you will
have based on part-file size and/or time.

Will this help you with your use-case?

Cheers,
Kostas




On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
<[hidden email]> wrote:
>
> Maybe I misreading the documentation, but:
> "Data within the partition directories are split into part files. Each partition will contain at least one part file for each subtask of the sink that has received data for that partition."
>
> So, it is 1 partition per subtask. I'm trying to figure out how to dynamically adjust which subtask is getting the data to minimize the number of subtasks writing into a specific partition.
>
> Alex
>
> On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <[hidden email]> wrote:
>>
>> You can achieve this in Flink 1.10 using the StreamingFileSink.
>>
>> I’d also like to note that Flink 1.11 (which is currently going through release testing and should be available imminently) has support for exactly this functionality in the table API.
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
>>
>>
>> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <[hidden email]> wrote:
>>>
>>> Hello!
>>>
>>> We are working an a Flink Streaming job that reads data from multiple Kafka topics and writes them to DFS. We are using StreamingFileSink with custom implementation for GCS FS and it generates a lot of files as streams are partitioned among multiple JMs. In the ideal case we should have at most 1 file per kafka topic per interval. We also have some heavy topics and some pretty light ones, so the solution should also be smart to utilize resources efficiently.
>>>
>>> I was thinking we can partition based on how much data is ingested in the last minute or so to make sure: messages from the same topic are routed to the same (or minimal number of ) file if there are enough resources to do so. Think bin packing.
>>>
>>> Is it a good idea? Is there a built in way to achieve it? If not, is there a way to push state into the partitioner (or even kafka client to repartition in the source)? I was thinking that I can run a side stream that will calculate data volumes and then broadcast it into the main stream, so partitioner can make a decision, but it feels a bit complex.
>>>
>>> Another way is to modify kafka client to track messages per topics and make decision at that layer.
>>>
>>> Am I on the right path?
>>>
>>> Thank you
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioner for Flink based on incoming load

rmetzger0
> This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too much data?

Yes

> Is there a way to avoid shuffle at all (or do only 1) and avoid a situation when 1 node will become a hotspot?

Do you know the amount of data per kafka topic beforehand, or does this have to be dynamic?


On Thu, Jun 25, 2020 at 8:15 PM Alexander Filipchik <[hidden email]> wrote:
This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too much data? Is there a way to avoid shuffle at all (or do only 1) and avoid a situation when 1 node will become a hotspot?

Alex

On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas <[hidden email]> wrote:
Hi Alexander,

Routing of input data in Flink can be done through keying and this can
guarantee collocation constraints. This means that you can send two
records to the same node by giving them the same key, e.g. the topic
name. Keep in mind that elements with different keys do not
necessarily go to different nodes, as key assignment to nodes is
random.

Given this, you could initially key by topic, so that all records of a
topic go to the same node. This node will compute statistics about the
topic, e.g. elem/sec (t) and based on thresholds assign new keys to
each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
will go to different machines but the probability of this happening
will increase with the parallelism of your job. Finally, based on your
bucket assigner and the rolling policy, you can redirect the elements
to the same bucket, e.g. TOPIC and tune how many part-files you will
have based on part-file size and/or time.

Will this help you with your use-case?

Cheers,
Kostas




On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
<[hidden email]> wrote:
>
> Maybe I misreading the documentation, but:
> "Data within the partition directories are split into part files. Each partition will contain at least one part file for each subtask of the sink that has received data for that partition."
>
> So, it is 1 partition per subtask. I'm trying to figure out how to dynamically adjust which subtask is getting the data to minimize the number of subtasks writing into a specific partition.
>
> Alex
>
> On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <[hidden email]> wrote:
>>
>> You can achieve this in Flink 1.10 using the StreamingFileSink.
>>
>> I’d also like to note that Flink 1.11 (which is currently going through release testing and should be available imminently) has support for exactly this functionality in the table API.
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
>>
>>
>> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <[hidden email]> wrote:
>>>
>>> Hello!
>>>
>>> We are working an a Flink Streaming job that reads data from multiple Kafka topics and writes them to DFS. We are using StreamingFileSink with custom implementation for GCS FS and it generates a lot of files as streams are partitioned among multiple JMs. In the ideal case we should have at most 1 file per kafka topic per interval. We also have some heavy topics and some pretty light ones, so the solution should also be smart to utilize resources efficiently.
>>>
>>> I was thinking we can partition based on how much data is ingested in the last minute or so to make sure: messages from the same topic are routed to the same (or minimal number of ) file if there are enough resources to do so. Think bin packing.
>>>
>>> Is it a good idea? Is there a built in way to achieve it? If not, is there a way to push state into the partitioner (or even kafka client to repartition in the source)? I was thinking that I can run a side stream that will calculate data volumes and then broadcast it into the main stream, so partitioner can make a decision, but it feels a bit complex.
>>>
>>> Another way is to modify kafka client to track messages per topics and make decision at that layer.
>>>
>>> Am I on the right path?
>>>
>>> Thank you