stream partitioning to avoid network overhead

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

stream partitioning to avoid network overhead

Karthik Deivasigamani
Hi,

   I have a use case where we read messages from a Kafka topic and invoke a webservice. The web-service call can take a take couple of seconds and then gives us back on avg 800KB of data. This data is set to another operator which does the parsing and then it gets sent to sink which saves the processed data in a NoSQL db. The app looks like this :

Inline image 1
Since my payload from the web service is large a lot of data is transferred over the network and this is becoming a bottle neck.

Lets say I have 6 slots per node and I would like to have 1 slot for source, 3 slots for web service calls, 2 for parser and 1 for my sink. This way all the processing can happen locally and there is no network overhead. I have tried stream.forward() but it requires that the down stream operator has the same number of parallelism as the one emitting data. Next I tried stream.rescale() and that does not schedule the task as I would expect it given the parallelism's on the operators are all multiple of each other (my flink cluster has enough empty slots and capacity).


Inline image 2


Is there a way to schedule my task's in a fashion where there is no data transfer over the network. I was able to do this in apache storm by using localOrShuffle grouping. Not sure how to acehive the same in flink. Any pointers would be really helpful.

For now I have solved this problem by having the same parallelism on the web-service operator, parser, sink which causes flink to chain these operator together and execute them in the same thread.But ideally I would like to have more instances of the slow operator and less instances of my fast operator.

~
Karthik
Reply | Threaded
Open this post in threaded view
|

Re: stream partitioning to avoid network overhead

Urs Schoenenberger
Hi Karthik,

maybe I'm misunderstanding, but there are a few things in your
description that seem strange to me:

- Your "slow" operator seems to be slow not because it's compute-heavy,
but because it's waiting for a response. Is AsyncIO (
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
) an option for you? It's a more natural approach to this issue compared
to raising the parallelism.

- A flink task slot can hold an instance of each operator. If you have
two (non-chained) operators, one with parallelism 1, one with
parallelism 2, you will use two slots, not three. You are not wasting
slots by having source parallelism 3 instead of 1.

- Therefore, it would in my opinion make sense to run all your operators
at the same parallelism. Operator chaining is desirable! Just run at the
max parallelism (the one you'd give to the webservice operator) and make
sure your Kafka topic has enough partitions to serve this parallelism.
If you're going the Async I/O way, you probably don't even need high
parallelism, but even with the sync implementation I can't think of a
big problem with running the chained operator at max parallelism.

- Minor point: I guess you have measured this, but I'm a little confused
why the intra-Flink network traffic would cause significant problems: It
seems to be the same amount of data that you're querying from the
external webservice, and that connection should be a bottleneck before
the intra-cluster network becomes one?


Best regards,
Urs

On 11.08.2017 04:54, Karthik Deivasigamani wrote:

> Hi,
>
>    I have a use case where we read messages from a Kafka topic and invoke a
> webservice. The web-service call can take a take couple of seconds and then
> gives us back on avg 800KB of data. This data is set to another operator
> which does the parsing and then it gets sent to sink which saves the
> processed data in a NoSQL db. The app looks like this :
>
> [image: Inline image 1]
> Since my payload from the web service is large a lot of data is transferred
> over the network and this is becoming a bottle neck.
>
> Lets say *I have 6 slots per node and I would like to have 1 slot for
> source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> This way all the processing can happen locally and there is no network
> overhead. I have tried *stream.forward() *but it requires that the down
> stream operator has the same number of parallelism as the one emitting
> data. Next I tried *stream.rescale()* and that does not schedule the task
> as I would expect it given the parallelism's on the operators are all
> multiple of each other (my flink cluster has enough empty slots and
> capacity).
>
>
> [image: Inline image 2]
>
>
> Is there a way to schedule my task's in a fashion where there is no data
> transfer over the network. I was able to do this in apache storm by using
> localOrShuffle grouping. Not sure how to acehive the same in flink. Any
> pointers would be really helpful.
>
> For now I have solved this problem by having the same parallelism on the
> web-service operator, parser, sink which causes flink to chain these
> operator together and execute them in the same thread.But ideally I would
> like to have more instances of the slow operator and less instances of my
> fast operator.
>
> ~
> Karthik
>
>
>
> Hi,
>
>    I have a use case where we read messages from a Kafka topic and
> invoke a webservice. The web-service call can take a take couple of
> seconds and then gives us back on avg 800KB of data. This data is set to
> another operator which does the parsing and then it gets sent to sink
> which saves the processed data in a NoSQL db. The app looks like this :
>
> Inline image 1
> Since my payload from the web service is large a lot of data is
> transferred over the network and this is becoming a bottle neck.
>
> Lets say *I have 6 slots per node and I would like to have 1 slot for
> source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> This way all the processing can happen locally and there is no network
> overhead. I have tried *stream.forward() *but it requires that the down
> stream operator has the same number of parallelism as the one emitting
> data. Next I tried *stream.rescale()* and that does not schedule the
> task as I would expect it given the parallelism's on the operators are
> all multiple of each other (my flink cluster has enough empty slots and
> capacity).
>
>
> Inline image 2
>
>
> Is there a way to schedule my task's in a fashion where there is no data
> transfer over the network. I was able to do this in apache storm by
> using localOrShuffle grouping. Not sure how to acehive the same in
> flink. Any pointers would be really helpful.
>
> For now I have solved this problem by having the same parallelism on the
> web-service operator, parser, sink which causes flink to chain these
> operator together and execute them in the same thread.But ideally I
> would like to have more instances of the slow operator and less
> instances of my fast operator.
>
> ~
> Karthik

--
Urs Schönenberger - [hidden email] - +49 174 9955 692

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: stream partitioning to avoid network overhead

Karthik Deivasigamani
Thanks Urs for your inputs.

Yes we use AsyncIO operator for our webservice calls.

We were considering increasing the kafka partitions and increasing the parallelism on the source to match the webservice operator. Wasn't quite sure if this was the only way to achieve operator chaining. Thanks for clarifying this. We will surely try this.

Initially when we ran the job - the webservice call parallelism was higher than the downstream operators(parser) parallelism. What we observed was after about 48hrs the memory usage by flink taskmanager was ~98% and system load was too high. When we chained the webservice and parser operators together by setting the same parallelism this problem went away completely.

What we were wondering is if setting the same parallelism on all the operators is the standard and desired way to achieve operator chaining or is there an alternate approach to achieve the same.
~
Karthik



On Fri, Aug 11, 2017 at 7:25 PM, Urs Schoenenberger <[hidden email]> wrote:
Hi Karthik,

maybe I'm misunderstanding, but there are a few things in your
description that seem strange to me:

- Your "slow" operator seems to be slow not because it's compute-heavy,
but because it's waiting for a response. Is AsyncIO (
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
) an option for you? It's a more natural approach to this issue compared
to raising the parallelism.

- A flink task slot can hold an instance of each operator. If you have
two (non-chained) operators, one with parallelism 1, one with
parallelism 2, you will use two slots, not three. You are not wasting
slots by having source parallelism 3 instead of 1.

- Therefore, it would in my opinion make sense to run all your operators
at the same parallelism. Operator chaining is desirable! Just run at the
max parallelism (the one you'd give to the webservice operator) and make
sure your Kafka topic has enough partitions to serve this parallelism.
If you're going the Async I/O way, you probably don't even need high
parallelism, but even with the sync implementation I can't think of a
big problem with running the chained operator at max parallelism.

- Minor point: I guess you have measured this, but I'm a little confused
why the intra-Flink network traffic would cause significant problems: It
seems to be the same amount of data that you're querying from the
external webservice, and that connection should be a bottleneck before
the intra-cluster network becomes one?


Best regards,
Urs

On 11.08.2017 04:54, Karthik Deivasigamani wrote:
> Hi,
>
>    I have a use case where we read messages from a Kafka topic and invoke a
> webservice. The web-service call can take a take couple of seconds and then
> gives us back on avg 800KB of data. This data is set to another operator
> which does the parsing and then it gets sent to sink which saves the
> processed data in a NoSQL db. The app looks like this :
>
> [image: Inline image 1]
> Since my payload from the web service is large a lot of data is transferred
> over the network and this is becoming a bottle neck.
>
> Lets say *I have 6 slots per node and I would like to have 1 slot for
> source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> This way all the processing can happen locally and there is no network
> overhead. I have tried *stream.forward() *but it requires that the down
> stream operator has the same number of parallelism as the one emitting
> data. Next I tried *stream.rescale()* and that does not schedule the task
> as I would expect it given the parallelism's on the operators are all
> multiple of each other (my flink cluster has enough empty slots and
> capacity).
>
>
> [image: Inline image 2]
>
>
> Is there a way to schedule my task's in a fashion where there is no data
> transfer over the network. I was able to do this in apache storm by using
> localOrShuffle grouping. Not sure how to acehive the same in flink. Any
> pointers would be really helpful.
>
> For now I have solved this problem by having the same parallelism on the
> web-service operator, parser, sink which causes flink to chain these
> operator together and execute them in the same thread.But ideally I would
> like to have more instances of the slow operator and less instances of my
> fast operator.
>
> ~
> Karthik
>
>
>
> Hi,
>
>    I have a use case where we read messages from a Kafka topic and
> invoke a webservice. The web-service call can take a take couple of
> seconds and then gives us back on avg 800KB of data. This data is set to
> another operator which does the parsing and then it gets sent to sink
> which saves the processed data in a NoSQL db. The app looks like this :
>
> Inline image 1
> Since my payload from the web service is large a lot of data is
> transferred over the network and this is becoming a bottle neck.
>
> Lets say *I have 6 slots per node and I would like to have 1 slot for
> source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> This way all the processing can happen locally and there is no network
> overhead. I have tried *stream.forward() *but it requires that the down
> stream operator has the same number of parallelism as the one emitting
> data. Next I tried *stream.rescale()* and that does not schedule the
> task as I would expect it given the parallelism's on the operators are
> all multiple of each other (my flink cluster has enough empty slots and
> capacity).
>
>
> Inline image 2
>
>
> Is there a way to schedule my task's in a fashion where there is no data
> transfer over the network. I was able to do this in apache storm by
> using localOrShuffle grouping. Not sure how to acehive the same in
> flink. Any pointers would be really helpful.
>
> For now I have solved this problem by having the same parallelism on the
> web-service operator, parser, sink which causes flink to chain these
> operator together and execute them in the same thread.But ideally I
> would like to have more instances of the slow operator and less
> instances of my fast operator.
>
> ~
> Karthik

--
Urs Schönenberger - [hidden email] - +49 174 9955 692

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: stream partitioning to avoid network overhead

Aljoscha Krettek
Hi,

Quick remark: operator chaining is only possible when the parallelism of the upstream and downstream operators is the same. So having the same parallelism is not the standard or desired way, it's the only way to achieve chaining.

Best,
Aljoscha

On 16. Aug 2017, at 18:26, Karthik Deivasigamani <[hidden email]> wrote:

Thanks Urs for your inputs.

Yes we use AsyncIO operator for our webservice calls.

We were considering increasing the kafka partitions and increasing the parallelism on the source to match the webservice operator. Wasn't quite sure if this was the only way to achieve operator chaining. Thanks for clarifying this. We will surely try this.

Initially when we ran the job - the webservice call parallelism was higher than the downstream operators(parser) parallelism. What we observed was after about 48hrs the memory usage by flink taskmanager was ~98% and system load was too high. When we chained the webservice and parser operators together by setting the same parallelism this problem went away completely.

What we were wondering is if setting the same parallelism on all the operators is the standard and desired way to achieve operator chaining or is there an alternate approach to achieve the same.
~
Karthik



On Fri, Aug 11, 2017 at 7:25 PM, Urs Schoenenberger <[hidden email]> wrote:
Hi Karthik,

maybe I'm misunderstanding, but there are a few things in your
description that seem strange to me:

- Your "slow" operator seems to be slow not because it's compute-heavy,
but because it's waiting for a response. Is AsyncIO (
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
) an option for you? It's a more natural approach to this issue compared
to raising the parallelism.

- A flink task slot can hold an instance of each operator. If you have
two (non-chained) operators, one with parallelism 1, one with
parallelism 2, you will use two slots, not three. You are not wasting
slots by having source parallelism 3 instead of 1.

- Therefore, it would in my opinion make sense to run all your operators
at the same parallelism. Operator chaining is desirable! Just run at the
max parallelism (the one you'd give to the webservice operator) and make
sure your Kafka topic has enough partitions to serve this parallelism.
If you're going the Async I/O way, you probably don't even need high
parallelism, but even with the sync implementation I can't think of a
big problem with running the chained operator at max parallelism.

- Minor point: I guess you have measured this, but I'm a little confused
why the intra-Flink network traffic would cause significant problems: It
seems to be the same amount of data that you're querying from the
external webservice, and that connection should be a bottleneck before
the intra-cluster network becomes one?


Best regards,
Urs

On 11.08.2017 04:54, Karthik Deivasigamani wrote:
> Hi,
>
>    I have a use case where we read messages from a Kafka topic and invoke a
> webservice. The web-service call can take a take couple of seconds and then
> gives us back on avg 800KB of data. This data is set to another operator
> which does the parsing and then it gets sent to sink which saves the
> processed data in a NoSQL db. The app looks like this :
>
> [image: Inline image 1]
> Since my payload from the web service is large a lot of data is transferred
> over the network and this is becoming a bottle neck.
>
> Lets say *I have 6 slots per node and I would like to have 1 slot for
> source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> This way all the processing can happen locally and there is no network
> overhead. I have tried *stream.forward() *but it requires that the down
> stream operator has the same number of parallelism as the one emitting
> data. Next I tried *stream.rescale()* and that does not schedule the task
> as I would expect it given the parallelism's on the operators are all
> multiple of each other (my flink cluster has enough empty slots and
> capacity).
>
>
> [image: Inline image 2]
>
>
> Is there a way to schedule my task's in a fashion where there is no data
> transfer over the network. I was able to do this in apache storm by using
> localOrShuffle grouping. Not sure how to acehive the same in flink. Any
> pointers would be really helpful.
>
> For now I have solved this problem by having the same parallelism on the
> web-service operator, parser, sink which causes flink to chain these
> operator together and execute them in the same thread.But ideally I would
> like to have more instances of the slow operator and less instances of my
> fast operator.
>
> ~
> Karthik
>
>
>
> Hi,
>
>    I have a use case where we read messages from a Kafka topic and
> invoke a webservice. The web-service call can take a take couple of
> seconds and then gives us back on avg 800KB of data. This data is set to
> another operator which does the parsing and then it gets sent to sink
> which saves the processed data in a NoSQL db. The app looks like this :
>
> Inline image 1
> Since my payload from the web service is large a lot of data is
> transferred over the network and this is becoming a bottle neck.
>
> Lets say *I have 6 slots per node and I would like to have 1 slot for
> source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> This way all the processing can happen locally and there is no network
> overhead. I have tried *stream.forward() *but it requires that the down
> stream operator has the same number of parallelism as the one emitting
> data. Next I tried *stream.rescale()* and that does not schedule the
> task as I would expect it given the parallelism's on the operators are
> all multiple of each other (my flink cluster has enough empty slots and
> capacity).
>
>
> Inline image 2
>
>
> Is there a way to schedule my task's in a fashion where there is no data
> transfer over the network. I was able to do this in apache storm by
> using localOrShuffle grouping. Not sure how to acehive the same in
> flink. Any pointers would be really helpful.
>
> For now I have solved this problem by having the same parallelism on the
> web-service operator, parser, sink which causes flink to chain these
> operator together and execute them in the same thread.But ideally I
> would like to have more instances of the slow operator and less
> instances of my fast operator.
>
> ~
> Karthik

--
Urs Schönenberger - [hidden email] - +49 174 9955 692

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082