how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

Yan Zhou [FDS Science] ­

Hi,


I am building a data pipeline with a lot of streaming join and over window aggregation. And flink SQL have these feature supported. However, there is no similar DataStream APIs provided(maybe there is and I didn't find them. please point out if there is). I got confused because I assume that the SQL logical plan will be translated into a graph of operators or transformations. 


Could someone explain how these two sql query are  implemented or translated into low level code ( operators or transformations)? I am asking this because I have implemented these features without using SQL and the performance looks good. And I certainly love to migrate to SQL, but I want to understand them well first. Any information or hints or links are appreciated


  1. Time-Windowed Join

The DataStream API only provides streaming join within same window. But the SQL API (time-windowed join) can join two streams within quite different time range. Below is an sample query that listed in official doc, and we can see that Orders and Shipments have 4 hours difference. Is it implemented by CoProcessFunction or TwoInputOperator which buffers the event for a certain period?


SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

2. Over-Window Aggregation
There is no similar feature in DataStream API. How does this get implemented? Does it use keyed state to buffer the previous events, and pull the records when there is a need? How does sorting get handled?


Best
Yan




Reply | Threaded
Open this post in threaded view
|

Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

Xingcan Cui
Hi Yan Zhou,

as you may have noticed, the SQL level stream join was not built on top of some join APIs but was implemented with the low-level CoProcessFunction (see TimeBoundedStreamInnerJoin.scala). The pipeline is generated in DataStreamWindowJoin.scala.

Regarding the over-window aggregation, most of the implementations can be found in this package. The pipeline is generated in DataStreamOverAggregate.scala.

In summary, they use built-in state tools to cache the rows/intermediate results and clean/fire them when necessary.

Hope that helps.

Best,
Xingcan

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] <[hidden email]> wrote:

Hi,


I am building a data pipeline with a lot of streaming join and over window aggregation. And flink SQL have these feature supported. However, there is no similar DataStream APIs provided(maybe there is and I didn't find them. please point out if there is). I got confused because I assume that the SQL logical plan will be translated into a graph of operators or transformations. 


Could someone explain how these two sql query are  implemented or translated into low level code ( operators or transformations)? I am asking this because I have implemented these features without using SQL and the performance looks good. And I certainly love to migrate to SQL, but I want to understand them well first. Any information or hints or links are appreciated


  1. Time-Windowed Join

The DataStream API only provides streaming join within same window. But the SQL API (time-windowed join) can join two streams within quite different time range. Below is an sample query that listed in official doc, and we can see that Orders and Shipments have 4 hours difference. Is it implemented by CoProcessFunction or TwoInputOperator which buffers the event for a certain period?


SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

2. Over-Window Aggregation
There is no similar feature in DataStream API. How does this get implemented? Does it use keyed state to buffer the previous events, and pull the records when there is a need? How does sorting get handled?


Best
Yan





Reply | Threaded
Open this post in threaded view
|

Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

Yan Zhou [FDS Science] ­

Thanks for the information.

 

Best

Yan

 

From: Xingcan Cui <[hidden email]>
Date: Wednesday, December 13, 2017 at 6:02 PM
To: "Yan Zhou [FDS Science]" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

 

Hi Yan Zhou,

 

as you may have noticed, the SQL level stream join was not built on top of some join APIs but was implemented with the low-level CoProcessFunction (see TimeBoundedStreamInnerJoin.scala). The pipeline is generated in DataStreamWindowJoin.scala.

 

Regarding the over-window aggregation, most of the implementations can be found in this package. The pipeline is generated in DataStreamOverAggregate.scala.

 

In summary, they use built-in state tools to cache the rows/intermediate results and clean/fire them when necessary.

 

Hope that helps.

 

Best,

Xingcan

 

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] <[hidden email]> wrote:

Hi,

 

I am building a data pipeline with a lot of streaming join and over window aggregation. And flink SQL have these feature supported. However, there is no similar DataStream APIs provided(maybe there is and I didn't find them. please point out if there is). I got confused because I assume that the SQL logical plan will be translated into a graph of operators or transformations. 

 

Could someone explain how these two sql query are  implemented or translated into low level code ( operators or transformations)? I am asking this because I have implemented these features without using SQL and the performance looks good. And I certainly love to migrate to SQL, but I want to understand them well first. Any information or hints or links are appreciated. 

 

  1. Time-Windowed Join

The DataStream API only provides streaming join within same window. But the SQL API (time-windowed join) can join two streams within quite different time range. Below is an sample query that listed in official doc, and we can see that Orders and Shipments have 4 hours difference. Is it implemented by CoProcessFunction or TwoInputOperator which buffers the event for a certain period?

 

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

 

2. Over-Window Aggregation

There is no similar feature in DataStream API. How does this get implemented? Does it use keyed state to buffer the previous events, and pull the records when there is a need? How does sorting get handled?

 

 

Best

Yan