Joining and windowing multiple streams using DataStream API or Table API & SQL

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Joining and windowing multiple streams using DataStream API or Table API & SQL

Pieter Bonte
Hi all,

I’m trying to apply a window operator over multiple streams (more than 2) and join these streams within the validity of the window. However, I have some questions about the time semantics using both the DataStream API and the Table API/SQL.

Lets say we have 3 streams, an A, B and C stream. And currently we have an A@0 (an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
We would like to join these streams when they fall within a sliding window of size 10 and slide 5.
Thus the first window W1[0,10) should contain A@0, B@5 and C@6.
The second window W2[5,15) should contain B@5, C@5 and C@13.
So only in the first window we could successfully join all 3 streams.

However, I’m not able to mimic this behaviour using the DataStream or Table API.


Using the DataStream API, joining multiple streams can be achieved by applying a first window and join stream A and stream B and then apply a second window to join the result of the previous window with stream C, e.g.:

streamA
  .join(streamB)
    .where(<key selector>).equalTo(<key selector>)
    .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) //<-Window Wab
    .apply (new JoinFunction () {...})
  .join(streamC)
    .where(<key selector>).equalTo(<key selector>)
    .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) //<-Window Wabc
    .apply (new JoinFunction () {…})

However, according to the documentation on Window Joins [1] (and debugging), the joined events from the first window (Wab) will be assigned a new timestamp that is the largest timestamp that still lies in the respective window, i.e. the time the window closes.
Thus the result of joining A@0 and B@5 over the first window (Wab) will be AB@9. When joining with the C-stream, AB@9 can be joined with both C@5 and C@13. Which is not the behaviour I would like to obtain, since A happend at timestamp 0, and C@13 is more than 10 timestamps away.

Using the Table API or SQL, I think this can be solved using Interval Joins [2]. However, it seems like the windowing semantics are different as you need to define one table(or stream) around which you want to apply a interval. Depending on the choice of table on which the interval is applied, different results can be obtained. For example, lets say we have 3 table versions of our previous streams, i.e. A, B and C, each with a time attribute ’ts’.
Applying an interval around table A would result in something like:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL ‘5' MINUTE AND B.ts + INTERVAL ‘5' MINUTE AND
        A.ts BETWEEN C.ts - INTERVAL ‘5' MINUTE AND C.ts + INTERVAL ‘5' MINUTE

So if we want a window of 10, I think we split the interval in 5 minutes before and after? However, now A@0 is not in the interval of C@6. Applying a interval of 10 would solve this problem, However  if we would apply an interval of 10 both before and after, but chose to fix the interval around B instead, we run into a different problem:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        B.ts BETWEEN A.ts - INTERVAL ‘10' MINUTE AND A.ts + INTERVAL ‘10' MINUTE AND
        B.ts BETWEEN C.ts - INTERVAL ‘10' MINUTE AND C.ts + INTERVAL ‘10’ MINUTE
In this case B@5 is in the interval of A@0 but also of C@13.

So my question is how can I join multiple streams within a window that would represent the behaviour as all the streams were joined in the same window? Should I write my own WindowOperator that assigns the smallest timestamp when two events can be joined instead of the time that the window closes?

Thanks in advance!

Kind regards,
Pieter

// code examples taken from [3].

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
[3] https://stackoverflow.com/a/50879029
-------------------------------------------------------------------------
Dr. Ir. Pieter Bonte
Ghent University - imec
IDLab
iGent Tower - Department of Information Technology
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900
F: +32 9 33 14899
E: [hidden email]
W: IDLab.technology
W: IDLab.ugent.be

Reply | Threaded
Open this post in threaded view
|

Re: Joining and windowing multiple streams using DataStream API or Table API & SQL

Till Rohrmann
Hi Pieter,

from the top of my head, I think the easiest way to solve this problem is to implement your own "window join" operation by first unioning all three streams and then applying a ProcessWindowFunction similar to

allEvents.keyBy((KeySelector<String, String>) value -> value).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(
  new ProcessWindowFunction<String, Result, Tuple, TimeWindow>() {
      @Override
      public void process(
              Tuple tuple,
              Context context,
              Iterable<String> elements,
              Collector<Result> out) throws Exception {
              // compute join result from elements    
      }
  });

@Timo is there an easier way using Flink's SQL or Table API?

Cheers,
Till

On Tue, Feb 16, 2021 at 3:36 PM Pieter Bonte <[hidden email]> wrote:
Hi all,

I’m trying to apply a window operator over multiple streams (more than 2) and join these streams within the validity of the window. However, I have some questions about the time semantics using both the DataStream API and the Table API/SQL.

Lets say we have 3 streams, an A, B and C stream. And currently we have an A@0 (an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
We would like to join these streams when they fall within a sliding window of size 10 and slide 5.
Thus the first window W1[0,10) should contain A@0, B@5 and C@6.
The second window W2[5,15) should contain B@5, C@5 and C@13.
So only in the first window we could successfully join all 3 streams.

However, I’m not able to mimic this behaviour using the DataStream or Table API.


Using the DataStream API, joining multiple streams can be achieved by applying a first window and join stream A and stream B and then apply a second window to join the result of the previous window with stream C, e.g.:

streamA
  .join(streamB)
    .where(<key selector>).equalTo(<key selector>)
    .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) //<-Window Wab
    .apply (new JoinFunction () {...})
  .join(streamC)
    .where(<key selector>).equalTo(<key selector>)
    .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) //<-Window Wabc
    .apply (new JoinFunction () {…})

However, according to the documentation on Window Joins [1] (and debugging), the joined events from the first window (Wab) will be assigned a new timestamp that is the largest timestamp that still lies in the respective window, i.e. the time the window closes.
Thus the result of joining A@0 and B@5 over the first window (Wab) will be AB@9. When joining with the C-stream, AB@9 can be joined with both C@5 and C@13. Which is not the behaviour I would like to obtain, since A happend at timestamp 0, and C@13 is more than 10 timestamps away.

Using the Table API or SQL, I think this can be solved using Interval Joins [2]. However, it seems like the windowing semantics are different as you need to define one table(or stream) around which you want to apply a interval. Depending on the choice of table on which the interval is applied, different results can be obtained. For example, lets say we have 3 table versions of our previous streams, i.e. A, B and C, each with a time attribute ’ts’.
Applying an interval around table A would result in something like:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL ‘5' MINUTE AND B.ts + INTERVAL ‘5' MINUTE AND
        A.ts BETWEEN C.ts - INTERVAL ‘5' MINUTE AND C.ts + INTERVAL ‘5' MINUTE

So if we want a window of 10, I think we split the interval in 5 minutes before and after? However, now A@0 is not in the interval of C@6. Applying a interval of 10 would solve this problem, However  if we would apply an interval of 10 both before and after, but chose to fix the interval around B instead, we run into a different problem:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        B.ts BETWEEN A.ts - INTERVAL ‘10' MINUTE AND A.ts + INTERVAL ‘10' MINUTE AND
        B.ts BETWEEN C.ts - INTERVAL ‘10' MINUTE AND C.ts + INTERVAL ‘10’ MINUTE
In this case B@5 is in the interval of A@0 but also of C@13.

So my question is how can I join multiple streams within a window that would represent the behaviour as all the streams were joined in the same window? Should I write my own WindowOperator that assigns the smallest timestamp when two events can be joined instead of the time that the window closes?

Thanks in advance!

Kind regards,
Pieter

// code examples taken from [3].

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
[3] https://stackoverflow.com/a/50879029
-------------------------------------------------------------------------
Dr. Ir. Pieter Bonte
Ghent University - imec
IDLab
iGent Tower - Department of Information Technology
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900
F: +32 9 33 14899
E: [hidden email]
W: IDLab.technology
W: IDLab.ugent.be

Reply | Threaded
Open this post in threaded view
|

Re: Joining and windowing multiple streams using DataStream API or Table API & SQL

Pieter Bonte
Hi Till,

Thanks for the feedback.

My use case is a little bit more tricky as I can’t key all the streams by the same field. 
Basically I’m trying to solve Continuous SPARQL queries, which consist of many joins. I’ve seen that SPARQL queries over RDF data has been discussed before on the mailing list, however, not for RDF streams that are only valid with a certain time window.
To give you an example of a simple query, which looks for 'Persons that are sitting next to Students':
Select * WHERE{
?x a Person.
?x nextTo ?y.
?y a Student.
}
So everything that matches ‘?x a Person’ could be my A Stream, ‘?x nextTo ?y’ a B Stream and ‘?y a Student’ a C Stream.
So for the first join, '?x a Person’ and '?x nextTo ?y’ need to be joined on variable ?x ,i.e. the first field of the A and B stream, while  '?x nextTo ?y’ and '?y a Student’ need to be joined on variable ?y, i.e. the second field of the B stream and first field of the C stream.

As I can’t key the streams before windowing, I tried to combine the streams together, window them and assign the window end time to each event. Then I separated the streams again and joined them using a CoProcessFunction. Based on the window end time, I know which events should be joined as they are contained in the same window. I thought that I could store the events of both streams and the last seen window time end time time stamp. If and event arrives with a larger window end time, I could join the previous seen events from both streams and clear them.
However, I see that the events arrive out of order in my CoProcessFunction, requiring me to store the content of various windows. I was a little bit surprised by this behaviour, but perhaps its because I’m using a fixed dataset for easy testing? The idea then was to store the content of multiple windows and use the progression of the watermark to know which windows could be cleared, so I don’t need to store the content of all possible previous windows. However,  it does not seem to be possible to combine a MapState with a list. The map state would contain the end time of each window as key, and  a list with previously seen content of that window as value. 
I’m guessing that there are more elegant and easier ways to solve this?

For the table API, I was able to find a solution as well. I first combine the streams, window them and again assign the window end time as time stamp of each event. I split the streams and convert them to tables. As the window end times are assigned, I can use these to window the data using intervals, e.g. ‘A.ts BETWEEN B.ts and B.ts’. This solution works, and it is easier to translate the SPARQL query to SQL. However, the program does not garbage collect the content of the streams that is out dated, as a window using the DataStream API would. I see that my flink program keeps growing in size. Is there a translation of the table api windows  to DataStream windows?
Should I use  the ‘setIdleStateRetentionTime’ configuration function, to remove state?

Thanks in advance!

Kind regards,
Pieter

-------------------------------------------------------------------------
Dr. Ir. Pieter Bonte
Ghent University - imec 
IDLab 
iGent Tower - Department of Information Technology 
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium 
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900 
F: +32 9 33 14899 
E: [hidden email] 
W: IDLab.technology 
W: IDLab.ugent.be 

On 17 Feb 2021, at 10:00, Till Rohrmann <[hidden email]> wrote:

Hi Pieter,

from the top of my head, I think the easiest way to solve this problem is to implement your own "window join" operation by first unioning all three streams and then applying a ProcessWindowFunction similar to

allEvents.keyBy((KeySelector<String, String>) value -> value).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(
  new ProcessWindowFunction<String, Result, Tuple, TimeWindow>() {
      @Override
      public void process(
              Tuple tuple,
              Context context,
              Iterable<String> elements,
              Collector<Result> out) throws Exception {
              // compute join result from elements    
      }
  });

@Timo is there an easier way using Flink's SQL or Table API?

Cheers,
Till

On Tue, Feb 16, 2021 at 3:36 PM Pieter Bonte <[hidden email]> wrote:
Hi all,

I’m trying to apply a window operator over multiple streams (more than 2) and join these streams within the validity of the window. However, I have some questions about the time semantics using both the DataStream API and the Table API/SQL.

Lets say we have 3 streams, an A, B and C stream. And currently we have an A@0 (an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
We would like to join these streams when they fall within a sliding window of size 10 and slide 5.
Thus the first window W1[0,10) should contain A@0, B@5 and C@6.
The second window W2[5,15) should contain B@5, C@5 and C@13.
So only in the first window we could successfully join all 3 streams.

However, I’m not able to mimic this behaviour using the DataStream or Table API.


Using the DataStream API, joining multiple streams can be achieved by applying a first window and join stream A and stream B and then apply a second window to join the result of the previous window with stream C, e.g.:

streamA
  .join(streamB)
    .where(<key selector>).equalTo(<key selector>)
    .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) //<-Window Wab
    .apply (new JoinFunction () {...})
  .join(streamC)
    .where(<key selector>).equalTo(<key selector>)
    .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) //<-Window Wabc
    .apply (new JoinFunction () {…})

However, according to the documentation on Window Joins [1] (and debugging), the joined events from the first window (Wab) will be assigned a new timestamp that is the largest timestamp that still lies in the respective window, i.e. the time the window closes.
Thus the result of joining A@0 and B@5 over the first window (Wab) will be AB@9. When joining with the C-stream, AB@9 can be joined with both C@5 and C@13. Which is not the behaviour I would like to obtain, since A happend at timestamp 0, and C@13 is more than 10 timestamps away.

Using the Table API or SQL, I think this can be solved using Interval Joins [2]. However, it seems like the windowing semantics are different as you need to define one table(or stream) around which you want to apply a interval. Depending on the choice of table on which the interval is applied, different results can be obtained. For example, lets say we have 3 table versions of our previous streams, i.e. A, B and C, each with a time attribute ’ts’.
Applying an interval around table A would result in something like:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL ‘5' MINUTE AND B.ts + INTERVAL ‘5' MINUTE AND
        A.ts BETWEEN C.ts - INTERVAL ‘5' MINUTE AND C.ts + INTERVAL ‘5' MINUTE

So if we want a window of 10, I think we split the interval in 5 minutes before and after? However, now A@0 is not in the interval of C@6. Applying a interval of 10 would solve this problem, However  if we would apply an interval of 10 both before and after, but chose to fix the interval around B instead, we run into a different problem:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        B.ts BETWEEN A.ts - INTERVAL ‘10' MINUTE AND A.ts + INTERVAL ‘10' MINUTE AND
        B.ts BETWEEN C.ts - INTERVAL ‘10' MINUTE AND C.ts + INTERVAL ‘10’ MINUTE
In this case B@5 is in the interval of A@0 but also of C@13.

So my question is how can I join multiple streams within a window that would represent the behaviour as all the streams were joined in the same window? Should I write my own WindowOperator that assigns the smallest timestamp when two events can be joined instead of the time that the window closes?

Thanks in advance!

Kind regards,
Pieter

// code examples taken from [3].

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
[3] https://stackoverflow.com/a/50879029
-------------------------------------------------------------------------
Dr. Ir. Pieter Bonte
Ghent University - imec
IDLab
iGent Tower - Department of Information Technology
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900
F: +32 9 33 14899
E: [hidden email]
W: IDLab.technology
W: IDLab.ugent.be