I have a execution flow (Streaming Job) with parallelism 1.
source -> map -> partitioner -> flatmap -> sink Since adding partitioner will start new thread but partitioner is spending average of 2 to 4 minutes while moving data from map to flat map . For more details about this : http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-Custom-Partitioner-in-Streaming-with-parallelism-1-adding-latency-td13766.html In some link here : https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks they have mentioned that the PipelinedSubpartition is a pipelined implementation to support streaming data exchange. The SpillableSubpartition is a blocking implementation to support batch data exchange. I am not sure how would i use these or reduce latency from map -> partitioner -> flatmap . |
So let's get the obvious question out of the way:
Why are you adding a partitioner when your parallelism is 1? On 22.06.2017 11:58, sohimankotia wrote: > I have a execution flow (Streaming Job) with parallelism 1. > > source -> map -> partitioner -> flatmap -> sink > > Since adding partitioner will start new thread but partitioner is spending > average of 2 to 4 minutes while moving data from map to flat map . > > For more details about this : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-Custom-Partitioner-in-Streaming-with-parallelism-1-adding-latency-td13766.html > > In some link here : > https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks > > they have mentioned that the > > PipelinedSubpartition is a pipelined implementation to support streaming > data exchange. The SpillableSubpartition is a blocking implementation to > support batch data exchange. > > I am not sure how would i use these or reduce latency from map -> > partitioner -> flatmap . > > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Hi Chesnay,
I have data categorized on some attribute(Key in partition ) which will be having n possible values. As of now job is enabled for only one value of that attribute . In couple of days we will enable all values of attribute with more parallelism so each attribute's type data get processed in single instance . So, while running with parallelism 1 I just observed the 2 to 4 minutes latency from map -> p -> flatmap |
Hi,
What do you mean by latency and how are you measuring this in your job? Best, Aljoscha > On 22. Jun 2017, at 14:23, sohimankotia <[hidden email]> wrote: > > Hi Chesnay, > > I have data categorized on some attribute(Key in partition ) which will be > having n possible values. As of now job is enabled for only one value of > that attribute . In couple of days we will enable all values of attribute > with more parallelism so each attribute's type data get processed in single > instance . > > So, while running with parallelism 1 I just observed the 2 to 4 minutes > latency from map -> p -> flatmap > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p13916.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
So In following execution flow :
source -> map -> partitioner -> flatmap -> sink I am attaching current time to tuple while emitting from map function , and then extracting that timestamp value from tuple in flatmap at a very first step . Then I am calculating difference between time attached while emitting from map and entering into flatmap . |
I think then there is something going wrong somewhere. Usually people get millisecond latencies even when they have a “keyBy” or shuffle in-between operations (which are not different to a custom partitioner at the system level).
What kind of sources/sinks is your program using? Best, Aljoscha > On 27. Jun 2017, at 17:04, sohimankotia <[hidden email]> wrote: > > So In following execution flow : > > source -> map -> partitioner -> flatmap -> sink > > I am attaching current time to tuple while emitting from map function , and > then extracting that timestamp value from tuple in flatmap at a very first > step . Then I am calculating difference between time attached while emitting > from map and entering into flatmap . > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14025.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Source is KafKa .
FlatMap has HBase Lookup Sink is Kafka . I tried to get stats over the days . I see that almost 40 % were having latency of 0 seconds , 10 % 0-30 sec, approx 10% 30-60 sec and 10 % around 60 - 120 sec and 30 % around 120 - 210 secs . |
I see, what I consider highly likely here is that the lookup to HBase is the bottleneck. If the lookup takes to long events “sit in a queue” between the map and flatMap operations. If you replace the HBase lookup by some dummy code you should see the latency go away.
The reason you don’t see latency when you don’t have a custom partitioner is that here the map and flatMap are chained together: sending an event from one operator to the next is basically just a function call and there is therefore no queue that can be filled that makes events “wait”. Best, Aljoscha > On 28. Jun 2017, at 15:17, sohimankotia <[hidden email]> wrote: > > Source is KafKa . > FlatMap has HBase Lookup > Sink is Kafka . > > I tried to get stats over the days . I see that almost 40 % were having > latency of 0 seconds , 10 % 0-30 sec, approx 10% 30-60 sec and 10 % around > 60 - 120 sec and 30 % around 120 - 210 secs . > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14036.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
I had same concern regarding HBase . So I also added metric to measure Hbase op time in flatmap (Basically complete flatmap op).
From metrics I see that aprox 96 % time op time was under 1 sec. (Still I can do a dummy run without HBase op . But did these timing make sense?) |
Even if the request time to HBase is just a couple of milliseconds this will add up and the elements sitting in the buffer between the map and flatMap will have high perceived latency, yes.
> On 28. Jun 2017, at 16:54, sohimankotia <[hidden email]> wrote: > > I had same concern regarding HBase . So I also added metric to measure Hbase > op time in flatmap (Basically complete flatmap op). > > From metrics I see that aprox 96 % time op time was under 1 sec. (Still I > can do a dummy run without HBase op . But did these timing make sense?) > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14040.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
So , it means when elements leave
map => sit in buffer (due to partitioner) => enter flatmap Since Hbase op in flat map are taking time lets say 1 sec per operation , next element will not be read from buffer until HBase Op is done. Due to this Hbase op , time to enter to flat map from map will get accumulated for elements waiting in buffer . Let me know if I understood correctly ? |
Yes, this is exactly right!
> On 29. Jun 2017, at 17:42, sohimankotia <[hidden email]> wrote: > > So , it means when elements leave > > map => sit in buffer (due to partitioner) => enter flatmap > > Since Hbase op in flat map are taking time lets say 1 sec per operation , > next element will not be read from buffer until HBase Op is done. > > Due to this Hbase op , time to enter to flat map from map will get > accumulated for elements waiting in buffer . > > Let me know if I understood correctly ? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14072.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Few last doubts :
1. So If I increase parallelism latency will decrease because load will get distributed ? 2. But if load will increase latency will also increase if parallelism is more ? 3. Let's say If I remove partitioner , and Hbase Op is still there in Flat map . Then also this latency would be there ? 4. If yes in point 3 , then latency would from reading elements from kafka ? |
Yes, in the end the requests to HBase are the bottle neck and the latency will manifest in different places of the job depending on where there is a queue. If there is a queue between map and flatMap elements will sit there and wait and you’ll see latency there. If map and flatMap are chained you will see the latency in the form of Kafka consumer lag (Kafka itself is the queue here).
Best, Aljoscha > On 29. Jun 2017, at 18:30, sohimankotia <[hidden email]> wrote: > > Few last doubts : > > 1. So If I increase parallelism latency will decrease because load will get > distributed ? > 2. But if load will increase latency will also increase if parallelism is > more ? > 3. Let's say If I remove partitioner , and Hbase Op is still there in Flat > map . Then also this latency would be there ? > 4. If yes in point 3 , then latency would from reading elements from kafka ? > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14076.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |