increasing parallelism increases the end2end latency in flink sql

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

increasing parallelism increases the end2end latency in flink sql

Yan Zhou [FDS Science] ­

Hi,


My application assigned timestamp to kafka event with BoundedOutOfOrdernessTimestampExtractor then converted them to a table. Finally flink SQL over-window aggregation irun against the table


When I double the parallelism of my flink application, the end2end latency is doubled.  What could be the cause? It seems to me that it's because of slower advance of watermark in operator of operators generated by sql.


In this email thread [1], it's said that flink sql remove the internal DataStream timestamp and move it into the record. Does the query ignore the internal DataStream watermarks and re-generate them from the record? Let say there are two operator instances for one task, do they have same watermark?


There is a similar issue that i can find in the email thread [2] .


Best

Yan


[1]: https://lists.apache.org/thread.html/c5182628272f018037ce832290f9b19976fe5c268aa72760635cf3cc@%3Cuser.flink.apache.org%3E

[2]: https://lists.apache.org/thread.html/bf789df06e979f80caf23f6b2c8676aaf07b007ae0d450ae887b6a82@%3Cuser.flink.apache.org%3E

Reply | Threaded
Open this post in threaded view
|

Re: increasing parallelism increases the end2end latency in flink sql

Yan Zhou [FDS Science] ­

The BoundedOutOfOrdernessTimestampExtractor  is assigned to datastream after kafka consumer. The graph is like:

KafkaSource-> map2Pojo -> BoundedOutOfOrdernessTimestampExtractor -> Table -> ......




From: Yan Zhou [FDS Science] <[hidden email]>
Sent: Wednesday, May 23, 2018 3:21:24 PM
To: [hidden email]
Subject: increasing parallelism increases the end2end latency in flink sql
 

Hi,


My application assigned timestamp to kafka event with BoundedOutOfOrdernessTimestampExtractor then converted them to a table. Finally flink SQL over-window aggregation irun against the table


When I double the parallelism of my flink application, the end2end latency is doubled.  What could be the cause? It seems to me that it's because of slower advance of watermark in operator of operators generated by sql.


In this email thread [1], it's said that flink sql remove the internal DataStream timestamp and move it into the record. Does the query ignore the internal DataStream watermarks and re-generate them from the record? Let say there are two operator instances for one task, do they have same watermark?


There is a similar issue that i can find in the email thread [2] .


Best

Yan


[1]: https://lists.apache.org/thread.html/c5182628272f018037ce832290f9b19976fe5c268aa72760635cf3cc@%3Cuser.flink.apache.org%3E

[2]: https://lists.apache.org/thread.html/bf789df06e979f80caf23f6b2c8676aaf07b007ae0d450ae887b6a82@%3Cuser.flink.apache.org%3E

Reply | Threaded
Open this post in threaded view
|

Re: increasing parallelism increases the end2end latency in flink sql

Timo Walther
Hi Yan,

SQL should not be the cause here. It is true that Flink removes the timestamp from a record when entering the SQL API but this timestamp is set again before time-based operations such as OVER windows. Watermarks are not touched. I think your issue is related to [2]. One explanation that I could image is that a watermark has to be broadcasted to all workers, depending on the frequency of watermark generation, a higher parallelism also causes a higher amout of watermarks to be broadcasted. Which Flink version are you using? Can you run your tests on the latest Flink RC, because there where some improvements to the buffer management that might help. I will loop in Piotr for that.

Regards,
Timo


Am 24.05.18 um 01:02 schrieb Yan Zhou [FDS Science]:

The BoundedOutOfOrdernessTimestampExtractor  is assigned to datastream after kafka consumer. The graph is like:

KafkaSource-> map2Pojo -> BoundedOutOfOrdernessTimestampExtractor -> Table -> ......




From: Yan Zhou [FDS Science] [hidden email]
Sent: Wednesday, May 23, 2018 3:21:24 PM
To: [hidden email]
Subject: increasing parallelism increases the end2end latency in flink sql
 

Hi,


My application assigned timestamp to kafka event with BoundedOutOfOrdernessTimestampExtractor then converted them to a table. Finally flink SQL over-window aggregation irun against the table


When I double the parallelism of my flink application, the end2end latency is doubled.  What could be the cause? It seems to me that it's because of slower advance of watermark in operator of operators generated by sql.


In this email thread [1], it's said that flink sql remove the internal DataStream timestamp and move it into the record. Does the query ignore the internal DataStream watermarks and re-generate them from the record? Let say there are two operator instances for one task, do they have same watermark?


There is a similar issue that i can find in the email thread [2] .


Best

Yan


[1]: https://lists.apache.org/thread.html/c5182628272f018037ce832290f9b19976fe5c268aa72760635cf3cc@%3Cuser.flink.apache.org%3E

[2]: https://lists.apache.org/thread.html/bf789df06e979f80caf23f6b2c8676aaf07b007ae0d450ae887b6a82@%3Cuser.flink.apache.org%3E