flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

Yan Zhou [FDS Science] ­

Hi,

I am using flink sql in my application. It simply reads records from kafka source, converts to table, then runs an query to have over window aggregation for each record. Time lag watermark assigner with 10ms time lag is used. 

The performance is not ideal. the end-to-end latency, which is the difference between the time an record arrives in flink source and the time the record arrives in flink sink, is around 250ms (median). Please note that my query, which is over window aggregation, will generate one result for each input record. I was expecting it to be less then 100ms. I increase the number of query to 100 times and still have same median end-to-end latency with plenty of CPU and memory available. It seems to me that something is holding my application back.

However, When I use process time as time attribute without changing anything else, the latency is reduced to 50ms. I understand that in general using process time should be faster. But for my test using event time, the time lag is set to only 10ms, which should mean the operators will almost immediately process the events after they arrives. And the classes which calculate over window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) basically have same logic. Why does using process_time or event_time could bring such big difference in end-to-end latency? And what is hold my application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 240 GB memory. There are 1 job manager and 6 task managers. Each task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs over ssd on these servers as well. 


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by the application. When the event arrives sink, the process latency is calculated as System.currentTimeMillis() - ingestionTs. The value is consider the end-to-end latency and recorded with histogram metric and can be view in flink web portal. RocksDB state backend is used. Time lag water assigner with time lag of 10ms is used. 


Custom Source 
-> Flat Map 
-> Timestamps/Watermarks 
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs, 
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, 
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) 
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) 
-> to: Tuple2 
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges between interval '24' hour preceding and current row) as cnt1 from myTable.


Reply | Threaded
Open this post in threaded view
|

Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

Fabian Hueske-2
Hi,

Flink advances watermarks based on all parallel source tasks. If one of the source tasks lags behind the others, the event time progresses as determined by the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,

I am using flink sql in my application. It simply reads records from kafka source, converts to table, then runs an query to have over window aggregation for each record. Time lag watermark assigner with 10ms time lag is used. 

The performance is not ideal. the end-to-end latency, which is the difference between the time an record arrives in flink source and the time the record arrives in flink sink, is around 250ms (median). Please note that my query, which is over window aggregation, will generate one result for each input record. I was expecting it to be less then 100ms. I increase the number of query to 100 times and still have same median end-to-end latency with plenty of CPU and memory available. It seems to me that something is holding my application back.

However, When I use process time as time attribute without changing anything else, the latency is reduced to 50ms. I understand that in general using process time should be faster. But for my test using event time, the time lag is set to only 10ms, which should mean the operators will almost immediately process the events after they arrives. And the classes which calculate over window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) basically have same logic. Why does using process_time or event_time could bring such big difference in end-to-end latency? And what is hold my application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 240 GB memory. There are 1 job manager and 6 task managers. Each task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs over ssd on these servers as well. 


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by the application. When the event arrives sink, the process latency is calculated as System.currentTimeMillis() - ingestionTs. The value is consider the end-to-end latency and recorded with histogram metric and can be view in flink web portal. RocksDB state backend is used. Time lag water assigner with time lag of 10ms is used. 


Custom Source 
-> Flat Map 
-> Timestamps/Watermarks 
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs, 
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, 
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) 
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) 
-> to: Tuple2 
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges between interval '24' hour preceding and current row) as cnt1 from myTable.



Reply | Threaded
Open this post in threaded view
|

Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

Yan Zhou [FDS Science] ­

Hi Fabian,


Thank you for answering the question. However, I think it doesn't explain my situation. the source tasks' watermark are set to 10 ms behind the system time. Assigners allowing a fixed amount of lateness is used. So even the slowest source task is not that slow. 


Best

Yan






From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 3:28 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

Flink advances watermarks based on all parallel source tasks. If one of the source tasks lags behind the others, the event time progresses as determined by the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,

I am using flink sql in my application. It simply reads records from kafka source, converts to table, then runs an query to have over window aggregation for each record. Time lag watermark assigner with 10ms time lag is used. 

The performance is not ideal. the end-to-end latency, which is the difference between the time an record arrives in flink source and the time the record arrives in flink sink, is around 250ms (median). Please note that my query, which is over window aggregation, will generate one result for each input record. I was expecting it to be less then 100ms. I increase the number of query to 100 times and still have same median end-to-end latency with plenty of CPU and memory available. It seems to me that something is holding my application back.

However, When I use process time as time attribute without changing anything else, the latency is reduced to 50ms. I understand that in general using process time should be faster. But for my test using event time, the time lag is set to only 10ms, which should mean the operators will almost immediately process the events after they arrives. And the classes which calculate over window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) basically have same logic. Why does using process_time or event_time could bring such big difference in end-to-end latency? And what is hold my application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 240 GB memory. There are 1 job manager and 6 task managers. Each task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs over ssd on these servers as well. 


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by the application. When the event arrives sink, the process latency is calculated as System.currentTimeMillis() - ingestionTs. The value is consider the end-to-end latency and recorded with histogram metric and can be view in flink web portal. RocksDB state backend is used. Time lag water assigner with time lag of 10ms is used. 


Custom Source 
-> Flat Map 
-> Timestamps/Watermarks 
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs, 
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, 
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) 
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) 
-> to: Tuple2 
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges between interval '24' hour preceding and current row) as cnt1 from myTable.



Reply | Threaded
Open this post in threaded view
|

Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

Fabian Hueske-2
Hi,

It is typically not a good idea to generate watermarks based on system (machine) time. Watermarks should be data driven.
As soon as the clock of one of your machines is 1 second behind the other machines, its watermarks will also be 1 second behind and hence the complete stream.

Best, Fabian

2018-03-14 18:40 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Thank you for answering the question. However, I think it doesn't explain my situation. the source tasks' watermark are set to 10 ms behind the system time. Assigners allowing a fixed amount of lateness is used. So even the slowest source task is not that slow. 


Best

Yan






From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 3:28 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

Flink advances watermarks based on all parallel source tasks. If one of the source tasks lags behind the others, the event time progresses as determined by the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,

I am using flink sql in my application. It simply reads records from kafka source, converts to table, then runs an query to have over window aggregation for each record. Time lag watermark assigner with 10ms time lag is used. 

The performance is not ideal. the end-to-end latency, which is the difference between the time an record arrives in flink source and the time the record arrives in flink sink, is around 250ms (median). Please note that my query, which is over window aggregation, will generate one result for each input record. I was expecting it to be less then 100ms. I increase the number of query to 100 times and still have same median end-to-end latency with plenty of CPU and memory available. It seems to me that something is holding my application back.

However, When I use process time as time attribute without changing anything else, the latency is reduced to 50ms. I understand that in general using process time should be faster. But for my test using event time, the time lag is set to only 10ms, which should mean the operators will almost immediately process the events after they arrives. And the classes which calculate over window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) basically have same logic. Why does using process_time or event_time could bring such big difference in end-to-end latency? And what is hold my application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 240 GB memory. There are 1 job manager and 6 task managers. Each task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs over ssd on these servers as well. 


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by the application. When the event arrives sink, the process latency is calculated as System.currentTimeMillis() - ingestionTs. The value is consider the end-to-end latency and recorded with histogram metric and can be view in flink web portal. RocksDB state backend is used. Time lag water assigner with time lag of 10ms is used. 


Custom Source 
-> Flat Map 
-> Timestamps/Watermarks 
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs, 
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, 
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) 
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) 
-> to: Tuple2 
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges between interval '24' hour preceding and current row) as cnt1 from myTable.




Reply | Threaded
Open this post in threaded view
|

Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

Yan Zhou [FDS Science] ­

Hi Fabian,


Yes, it is typically not a good idea to generate watermark based on system time. I was setting the watermark based on system time with very little delay to see how fast my application could process the data. All the servers are sync with ntp and only 1ms difference with each other. What i assume is that setting watermark in this way should have similar end-to-end latency with using process time.


However, it's not the case. the end-to-end latency of application using process time is only has 1/3 of the other(50ms vs 150ms).  Why was that?  Please help me to understand. 


Best

Yan


From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 12:02:01 PM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

It is typically not a good idea to generate watermarks based on system (machine) time. Watermarks should be data driven.
As soon as the clock of one of your machines is 1 second behind the other machines, its watermarks will also be 1 second behind and hence the complete stream.

Best, Fabian

2018-03-14 18:40 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Thank you for answering the question. However, I think it doesn't explain my situation. the source tasks' watermark are set to 10 ms behind the system time. Assigners allowing a fixed amount of lateness is used. So even the slowest source task is not that slow. 


Best

Yan






From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 3:28 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

Flink advances watermarks based on all parallel source tasks. If one of the source tasks lags behind the others, the event time progresses as determined by the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,

I am using flink sql in my application. It simply reads records from kafka source, converts to table, then runs an query to have over window aggregation for each record. Time lag watermark assigner with 10ms time lag is used. 

The performance is not ideal. the end-to-end latency, which is the difference between the time an record arrives in flink source and the time the record arrives in flink sink, is around 250ms (median). Please note that my query, which is over window aggregation, will generate one result for each input record. I was expecting it to be less then 100ms. I increase the number of query to 100 times and still have same median end-to-end latency with plenty of CPU and memory available. It seems to me that something is holding my application back.

However, When I use process time as time attribute without changing anything else, the latency is reduced to 50ms. I understand that in general using process time should be faster. But for my test using event time, the time lag is set to only 10ms, which should mean the operators will almost immediately process the events after they arrives. And the classes which calculate over window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) basically have same logic. Why does using process_time or event_time could bring such big difference in end-to-end latency? And what is hold my application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 240 GB memory. There are 1 job manager and 6 task managers. Each task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs over ssd on these servers as well. 


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by the application. When the event arrives sink, the process latency is calculated as System.currentTimeMillis() - ingestionTs. The value is consider the end-to-end latency and recorded with histogram metric and can be view in flink web portal. RocksDB state backend is used. Time lag water assigner with time lag of 10ms is used. 


Custom Source 
-> Flat Map 
-> Timestamps/Watermarks 
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs, 
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, 
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) 
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) 
-> to: Tuple2 
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges between interval '24' hour preceding and current row) as cnt1 from myTable.




Reply | Threaded
Open this post in threaded view
|

Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

Fabian Hueske-2
I see...
Another issue might be the frequency with which you emit watermarks (in case you use a periodic watermark assigner).
You can set the interval with StreamExecutionEnvironment.getConfig.setAutoWatermarkInterval() [1].
However, keep in mind that each watermark is an additional record which might add overhead.

You can also play with the buffer timeout [2] to reduce the latency.


2018-03-15 19:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Yes, it is typically not a good idea to generate watermark based on system time. I was setting the watermark based on system time with very little delay to see how fast my application could process the data. All the servers are sync with ntp and only 1ms difference with each other. What i assume is that setting watermark in this way should have similar end-to-end latency with using process time.


However, it's not the case. the end-to-end latency of application using process time is only has 1/3 of the other(50ms vs 150ms).  Why was that?  Please help me to understand. 


Best

Yan


From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 12:02:01 PM

To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

It is typically not a good idea to generate watermarks based on system (machine) time. Watermarks should be data driven.
As soon as the clock of one of your machines is 1 second behind the other machines, its watermarks will also be 1 second behind and hence the complete stream.

Best, Fabian

2018-03-14 18:40 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Thank you for answering the question. However, I think it doesn't explain my situation. the source tasks' watermark are set to 10 ms behind the system time. Assigners allowing a fixed amount of lateness is used. So even the slowest source task is not that slow. 


Best

Yan






From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 3:28 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

Flink advances watermarks based on all parallel source tasks. If one of the source tasks lags behind the others, the event time progresses as determined by the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,

I am using flink sql in my application. It simply reads records from kafka source, converts to table, then runs an query to have over window aggregation for each record. Time lag watermark assigner with 10ms time lag is used. 

The performance is not ideal. the end-to-end latency, which is the difference between the time an record arrives in flink source and the time the record arrives in flink sink, is around 250ms (median). Please note that my query, which is over window aggregation, will generate one result for each input record. I was expecting it to be less then 100ms. I increase the number of query to 100 times and still have same median end-to-end latency with plenty of CPU and memory available. It seems to me that something is holding my application back.

However, When I use process time as time attribute without changing anything else, the latency is reduced to 50ms. I understand that in general using process time should be faster. But for my test using event time, the time lag is set to only 10ms, which should mean the operators will almost immediately process the events after they arrives. And the classes which calculate over window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) basically have same logic. Why does using process_time or event_time could bring such big difference in end-to-end latency? And what is hold my application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 240 GB memory. There are 1 job manager and 6 task managers. Each task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs over ssd on these servers as well. 


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by the application. When the event arrives sink, the process latency is calculated as System.currentTimeMillis() - ingestionTs. The value is consider the end-to-end latency and recorded with histogram metric and can be view in flink web portal. RocksDB state backend is used. Time lag water assigner with time lag of 10ms is used. 


Custom Source 
-> Flat Map 
-> Timestamps/Watermarks 
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs, 
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, 
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) 
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) 
-> to: Tuple2 
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges between interval '24' hour preceding and current row) as cnt1 from myTable.





Reply | Threaded
Open this post in threaded view
|

Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

Yan Zhou [FDS Science] ­

Hi Fabian,


Thank you for the information. 

After setting the watermark interval to 10ms and buffer timeout to 0 ms, the end-to-end latency is reduced to 5ms. I am very happy with the result and will go from there.



Best

Yan


From: Fabian Hueske <[hidden email]>
Sent: Thursday, March 15, 2018 11:55:12 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
I see...
Another issue might be the frequency with which you emit watermarks (in case you use a periodic watermark assigner).
You can set the interval with StreamExecutionEnvironment.getConfig.setAutoWatermarkInterval() [1].
However, keep in mind that each watermark is an additional record which might add overhead.

You can also play with the buffer timeout [2] to reduce the latency.


2018-03-15 19:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Yes, it is typically not a good idea to generate watermark based on system time. I was setting the watermark based on system time with very little delay to see how fast my application could process the data. All the servers are sync with ntp and only 1ms difference with each other. What i assume is that setting watermark in this way should have similar end-to-end latency with using process time.


However, it's not the case. the end-to-end latency of application using process time is only has 1/3 of the other(50ms vs 150ms).  Why was that?  Please help me to understand. 


Best

Yan


From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 12:02:01 PM

To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

It is typically not a good idea to generate watermarks based on system (machine) time. Watermarks should be data driven.
As soon as the clock of one of your machines is 1 second behind the other machines, its watermarks will also be 1 second behind and hence the complete stream.

Best, Fabian

2018-03-14 18:40 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Thank you for answering the question. However, I think it doesn't explain my situation. the source tasks' watermark are set to 10 ms behind the system time. Assigners allowing a fixed amount of lateness is used. So even the slowest source task is not that slow. 


Best

Yan






From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 3:28 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

Flink advances watermarks based on all parallel source tasks. If one of the source tasks lags behind the others, the event time progresses as determined by the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,

I am using flink sql in my application. It simply reads records from kafka source, converts to table, then runs an query to have over window aggregation for each record. Time lag watermark assigner with 10ms time lag is used. 

The performance is not ideal. the end-to-end latency, which is the difference between the time an record arrives in flink source and the time the record arrives in flink sink, is around 250ms (median). Please note that my query, which is over window aggregation, will generate one result for each input record. I was expecting it to be less then 100ms. I increase the number of query to 100 times and still have same median end-to-end latency with plenty of CPU and memory available. It seems to me that something is holding my application back.

However, When I use process time as time attribute without changing anything else, the latency is reduced to 50ms. I understand that in general using process time should be faster. But for my test using event time, the time lag is set to only 10ms, which should mean the operators will almost immediately process the events after they arrives. And the classes which calculate over window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) basically have same logic. Why does using process_time or event_time could bring such big difference in end-to-end latency? And what is hold my application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 240 GB memory. There are 1 job manager and 6 task managers. Each task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs over ssd on these servers as well. 


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by the application. When the event arrives sink, the process latency is calculated as System.currentTimeMillis() - ingestionTs. The value is consider the end-to-end latency and recorded with histogram metric and can be view in flink web portal. RocksDB state backend is used. Time lag water assigner with time lag of 10ms is used. 


Custom Source 
-> Flat Map 
-> Timestamps/Watermarks 
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs, 
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, 
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) 
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) 
-> to: Tuple2 
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges between interval '24' hour preceding and current row) as cnt1 from myTable.





Reply | Threaded
Open this post in threaded view
|

Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

Fabian Hueske-2
Great! Thanks for reporting back :-)

Buffer timeout of 0ms is quite aggressive. You might sending buffers of (by default) 32KB that just contain a single record.
Anyway, now you know the nobs to tune the latency.

Cheers, Fabian

2018-03-15 21:00 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Thank you for the information. 

After setting the watermark interval to 10ms and buffer timeout to 0 ms, the end-to-end latency is reduced to 5ms. I am very happy with the result and will go from there.



Best

Yan


From: Fabian Hueske <[hidden email]>
Sent: Thursday, March 15, 2018 11:55:12 AM

To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
I see...
Another issue might be the frequency with which you emit watermarks (in case you use a periodic watermark assigner).
You can set the interval with StreamExecutionEnvironment.getConfig.setAutoWatermarkInterval() [1].
However, keep in mind that each watermark is an additional record which might add overhead.

You can also play with the buffer timeout [2] to reduce the latency.


2018-03-15 19:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Yes, it is typically not a good idea to generate watermark based on system time. I was setting the watermark based on system time with very little delay to see how fast my application could process the data. All the servers are sync with ntp and only 1ms difference with each other. What i assume is that setting watermark in this way should have similar end-to-end latency with using process time.


However, it's not the case. the end-to-end latency of application using process time is only has 1/3 of the other(50ms vs 150ms).  Why was that?  Please help me to understand. 


Best

Yan


From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 12:02:01 PM

To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

It is typically not a good idea to generate watermarks based on system (machine) time. Watermarks should be data driven.
As soon as the clock of one of your machines is 1 second behind the other machines, its watermarks will also be 1 second behind and hence the complete stream.

Best, Fabian

2018-03-14 18:40 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi Fabian,


Thank you for answering the question. However, I think it doesn't explain my situation. the source tasks' watermark are set to 10 ms behind the system time. Assigners allowing a fixed amount of lateness is used. So even the slowest source task is not that slow. 


Best

Yan






From: Fabian Hueske <[hidden email]>
Sent: Wednesday, March 14, 2018 3:28 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time
 
Hi,

Flink advances watermarks based on all parallel source tasks. If one of the source tasks lags behind the others, the event time progresses as determined by the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,

I am using flink sql in my application. It simply reads records from kafka source, converts to table, then runs an query to have over window aggregation for each record. Time lag watermark assigner with 10ms time lag is used. 

The performance is not ideal. the end-to-end latency, which is the difference between the time an record arrives in flink source and the time the record arrives in flink sink, is around 250ms (median). Please note that my query, which is over window aggregation, will generate one result for each input record. I was expecting it to be less then 100ms. I increase the number of query to 100 times and still have same median end-to-end latency with plenty of CPU and memory available. It seems to me that something is holding my application back.

However, When I use process time as time attribute without changing anything else, the latency is reduced to 50ms. I understand that in general using process time should be faster. But for my test using event time, the time lag is set to only 10ms, which should mean the operators will almost immediately process the events after they arrives. And the classes which calculate over window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) basically have same logic. Why does using process_time or event_time could bring such big difference in end-to-end latency? And what is hold my application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 240 GB memory. There are 1 job manager and 6 task managers. Each task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs over ssd on these servers as well. 


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by the application. When the event arrives sink, the process latency is calculated as System.currentTimeMillis() - ingestionTs. The value is consider the end-to-end latency and recorded with histogram metric and can be view in flink web portal. RocksDB state backend is used. Time lag water assigner with time lag of 10ms is used. 


Custom Source 
-> Flat Map 
-> Timestamps/Watermarks 
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs, 
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, 
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) 
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) 
-> to: Tuple2 
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges between interval '24' hour preceding and current row) as cnt1 from myTable.