Hi, I have a use case where we read messages from a Kafka topic and invoke a webservice. The web-service call can take a take couple of seconds and then gives us back on avg 800KB of data. This data is set to another operator which does the parsing and then it gets sent to sink which saves the processed data in a NoSQL db. The app looks like this : Since my payload from the web service is large a lot of data is transferred over the network and this is becoming a bottle neck. Lets say I have 6 slots per node and I would like to have 1 slot for source, 3 slots for web service calls, 2 for parser and 1 for my sink. This way all the processing can happen locally and there is no network overhead. I have tried stream.forward() but it requires that the down stream operator has the same number of parallelism as the one emitting data. Next I tried stream.rescale() and that does not schedule the task as I would expect it given the parallelism's on the operators are all multiple of each other (my flink cluster has enough empty slots and capacity). Is there a way to schedule my task's in a fashion where there is no data transfer over the network. I was able to do this in apache storm by using localOrShuffle grouping. Not sure how to acehive the same in flink. Any pointers would be really helpful. For now I have solved this problem by having the same parallelism on the
web-service operator, parser, sink which causes flink to chain these
operator together and execute them in the same thread.But ideally I would like to have more instances of the slow operator and less instances of my fast operator. ~ Karthik |
Hi Karthik,
maybe I'm misunderstanding, but there are a few things in your description that seem strange to me: - Your "slow" operator seems to be slow not because it's compute-heavy, but because it's waiting for a response. Is AsyncIO ( https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html ) an option for you? It's a more natural approach to this issue compared to raising the parallelism. - A flink task slot can hold an instance of each operator. If you have two (non-chained) operators, one with parallelism 1, one with parallelism 2, you will use two slots, not three. You are not wasting slots by having source parallelism 3 instead of 1. - Therefore, it would in my opinion make sense to run all your operators at the same parallelism. Operator chaining is desirable! Just run at the max parallelism (the one you'd give to the webservice operator) and make sure your Kafka topic has enough partitions to serve this parallelism. If you're going the Async I/O way, you probably don't even need high parallelism, but even with the sync implementation I can't think of a big problem with running the chained operator at max parallelism. - Minor point: I guess you have measured this, but I'm a little confused why the intra-Flink network traffic would cause significant problems: It seems to be the same amount of data that you're querying from the external webservice, and that connection should be a bottleneck before the intra-cluster network becomes one? Best regards, Urs On 11.08.2017 04:54, Karthik Deivasigamani wrote: > Hi, > > I have a use case where we read messages from a Kafka topic and invoke a > webservice. The web-service call can take a take couple of seconds and then > gives us back on avg 800KB of data. This data is set to another operator > which does the parsing and then it gets sent to sink which saves the > processed data in a NoSQL db. The app looks like this : > > [image: Inline image 1] > Since my payload from the web service is large a lot of data is transferred > over the network and this is becoming a bottle neck. > > Lets say *I have 6 slots per node and I would like to have 1 slot for > source, 3 slots for web service calls, 2 for parser and 1 for my sink*. > This way all the processing can happen locally and there is no network > overhead. I have tried *stream.forward() *but it requires that the down > stream operator has the same number of parallelism as the one emitting > data. Next I tried *stream.rescale()* and that does not schedule the task > as I would expect it given the parallelism's on the operators are all > multiple of each other (my flink cluster has enough empty slots and > capacity). > > > [image: Inline image 2] > > > Is there a way to schedule my task's in a fashion where there is no data > transfer over the network. I was able to do this in apache storm by using > localOrShuffle grouping. Not sure how to acehive the same in flink. Any > pointers would be really helpful. > > For now I have solved this problem by having the same parallelism on the > web-service operator, parser, sink which causes flink to chain these > operator together and execute them in the same thread.But ideally I would > like to have more instances of the slow operator and less instances of my > fast operator. > > ~ > Karthik > > > > Hi, > > I have a use case where we read messages from a Kafka topic and > invoke a webservice. The web-service call can take a take couple of > seconds and then gives us back on avg 800KB of data. This data is set to > another operator which does the parsing and then it gets sent to sink > which saves the processed data in a NoSQL db. The app looks like this : > > Inline image 1 > Since my payload from the web service is large a lot of data is > transferred over the network and this is becoming a bottle neck. > > Lets say *I have 6 slots per node and I would like to have 1 slot for > source, 3 slots for web service calls, 2 for parser and 1 for my sink*. > This way all the processing can happen locally and there is no network > overhead. I have tried *stream.forward() *but it requires that the down > stream operator has the same number of parallelism as the one emitting > data. Next I tried *stream.rescale()* and that does not schedule the > task as I would expect it given the parallelism's on the operators are > all multiple of each other (my flink cluster has enough empty slots and > capacity). > > > Inline image 2 > > > Is there a way to schedule my task's in a fashion where there is no data > transfer over the network. I was able to do this in apache storm by > using localOrShuffle grouping. Not sure how to acehive the same in > flink. Any pointers would be really helpful. > > For now I have solved this problem by having the same parallelism on the > web-service operator, parser, sink which causes flink to chain these > operator together and execute them in the same thread.But ideally I > would like to have more instances of the slow operator and less > instances of my fast operator. > > ~ > Karthik -- Urs Schönenberger - [hidden email] - +49 174 9955 692 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Thanks Urs for your inputs. Yes we use AsyncIO operator for our webservice calls. Initially when we ran the job - the webservice call parallelism was higher than the downstream operators(parser) parallelism. What we observed was after about 48hrs the memory usage by flink taskmanager was ~98% and system load was too high. When we chained the webservice and parser operators together by setting the same parallelism this problem went away completely. What we were wondering is if setting the same parallelism on all the operators is the standard and desired way to achieve operator chaining or is there an alternate approach to achieve the same. ~ Karthik On Fri, Aug 11, 2017 at 7:25 PM, Urs Schoenenberger <[hidden email]> wrote: Hi Karthik, |
Hi,
Quick remark: operator chaining is only possible when the parallelism of the upstream and downstream operators is the same. So having the same parallelism is not the standard or desired way, it's the only way to achieve chaining. Best, Aljoscha
|
Free forum by Nabble | Edit this page |