Controlling the Materialization of JOIN updates

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Controlling the Materialization of JOIN updates

Benoît Paris-2
Hello all!

I'm trying to design a stream pipeline, and have trouble controlling when a JOIN is triggering an update:

Setup:
  • The Event table; "probe side", "query side", the result of earlier stream processing
  • The DimensionAtJoinTimeX tables; of updating nature, "build side", the results of earlier stream processing
Joining them:

SELECT    * 
FROM      Event e 
LEFT JOIN DimensionAtJoinTime1 d1 
  ON      e.uid = d1.uid 
LEFT JOIN DimensionAtJoinTime2 d2 
  ON      e.uid = d2.uid  

The DimensionAtJoinTimeX Tables being the result of earlier stream processing, possibly from the same Event table:

SELECT   uid, 
         hop_start(...), 
         sum(...)
FROM     Event e
GROUP BY uid, 
         hop(...)  

The Event Table being:

SELECT ...
FROM   EventRawInput i
WHERE  i.some_field = 'some_value'  

Requirements:
  • I need the JOINs to only be executed once, only when a new line is appended to the probe / query / Event table.
  • I also need the full pipeline to be defined in SQL.
  • I very strongly prefer the Blink planner (mainly for Deduplication, TopN and LAST_VALUE features).
Problem exploration so far:
  • Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution in SQL: it doesn't work out. But I might explore the following: insert DimensionAtJoinTimeX into a special Sink, and use it in a LookupableTableSource (I'm at a loss on how to do that, though. Do I need an external kv store?).
  • Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have missed something in the documentation.
  • Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner: It does not work with two tables [3], and I don't get to have the Blink planner features.
  • Option 4, "LATERAL TABLE table_function" [2], on the Blink planner: It does not work with the "probe side" being the results of earlier stream processing [4].
  • Option 5, let a regular JOIN materialize the updates, and somehow find how to filter the ones coming from the build sides (I'm at a loss on how to do that).
  • Option 6, "TVR": I read this paper [5], which mentions "Time-Varying Relation"s; Speculating here: could there be a way, to say that the build side is not a TVR. Aka declare the stream as being somehow "static", while still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
  • Option 7: Is there some features being developed, or hints, or workarounds to control the JOIN updates that I have not considered so far?
  • Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same bug nature, even though they occur in different situations on different planners (same Exception Stack Trace on files that have the same historical parent before the Blink fork). FLINK-15112 has a workaround, but FLINK-14200 does not. The existence of that workaround IMHO signals that there is a simple fix for both bugs. I have tried to find it in Flink for a few days, but no success so far. If you guys have pointers helping me provide a fix, I'll gladly listen. So far I have progressed to: It revolves around Calcite-based Flink streaming rules transforming a temporal table function correlate into a Join on 2*Scan, and crashes when it encounters something that is not a table that can be readily scanned. Also, there are shenanigans on trying to find the right schema in the Catalog. But I am blocked now, and not accustomed to the Flink internal code (would like to though, if Alibaba/Ververica are recruiting remote workers, wink wink, nudge nudge).
All opinions very much welcomed on all Options and Remarks!

Cheers, and a happy new year to all,
Benoît

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#processing-time-temporal-joins

[3] https://issues.apache.org/jira/browse/FLINK-15112

[4] https://issues.apache.org/jira/browse/FLINK-14200

[5] https://arxiv.org/pdf/1905.12133.pdf
Reply | Threaded
Open this post in threaded view
|

Re: Controlling the Materialization of JOIN updates

Kurt Young
Hi Benoît, 

Before discussing all the options you listed, I'd like understand more about your requirements.

The part I don't fully understand is, both your fact (Event) and dimension (DimensionAtJoinTimeX) tables are
coming from the same table, Event or EventRawInput in your case. So it will result that both your fact and
dimension tables are changing with time. 

My understanding is, when your DimensionAtJoinTimeX table emit the results, you don't want to change the 
result again. You want the fact table only join whatever data currently the dimension table have? I'm asking 
because your dimension table was calculated with a window aggregation, but your join logic seems doesn't 
care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid = d1.uid). It's possible that
when a record with uid=x comes from Event table, but the dimension table doesn't have any data around 
uid=x yet due to the window aggregation. In this case, you don't want them to join? 

Best,
Kurt


On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <[hidden email]> wrote:
Hello all!

I'm trying to design a stream pipeline, and have trouble controlling when a JOIN is triggering an update:

Setup:
  • The Event table; "probe side", "query side", the result of earlier stream processing
  • The DimensionAtJoinTimeX tables; of updating nature, "build side", the results of earlier stream processing
Joining them:

SELECT    * 
FROM      Event e 
LEFT JOIN DimensionAtJoinTime1 d1 
  ON      e.uid = d1.uid 
LEFT JOIN DimensionAtJoinTime2 d2 
  ON      e.uid = d2.uid  

The DimensionAtJoinTimeX Tables being the result of earlier stream processing, possibly from the same Event table:

SELECT   uid, 
         hop_start(...), 
         sum(...)
FROM     Event e
GROUP BY uid, 
         hop(...)  

The Event Table being:

SELECT ...
FROM   EventRawInput i
WHERE  i.some_field = 'some_value'  

Requirements:
  • I need the JOINs to only be executed once, only when a new line is appended to the probe / query / Event table.
  • I also need the full pipeline to be defined in SQL.
  • I very strongly prefer the Blink planner (mainly for Deduplication, TopN and LAST_VALUE features).
Problem exploration so far:
  • Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution in SQL: it doesn't work out. But I might explore the following: insert DimensionAtJoinTimeX into a special Sink, and use it in a LookupableTableSource (I'm at a loss on how to do that, though. Do I need an external kv store?).
  • Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have missed something in the documentation.
  • Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner: It does not work with two tables [3], and I don't get to have the Blink planner features.
  • Option 4, "LATERAL TABLE table_function" [2], on the Blink planner: It does not work with the "probe side" being the results of earlier stream processing [4].
  • Option 5, let a regular JOIN materialize the updates, and somehow find how to filter the ones coming from the build sides (I'm at a loss on how to do that).
  • Option 6, "TVR": I read this paper [5], which mentions "Time-Varying Relation"s; Speculating here: could there be a way, to say that the build side is not a TVR. Aka declare the stream as being somehow "static", while still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
  • Option 7: Is there some features being developed, or hints, or workarounds to control the JOIN updates that I have not considered so far?
  • Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same bug nature, even though they occur in different situations on different planners (same Exception Stack Trace on files that have the same historical parent before the Blink fork). FLINK-15112 has a workaround, but FLINK-14200 does not. The existence of that workaround IMHO signals that there is a simple fix for both bugs. I have tried to find it in Flink for a few days, but no success so far. If you guys have pointers helping me provide a fix, I'll gladly listen. So far I have progressed to: It revolves around Calcite-based Flink streaming rules transforming a temporal table function correlate into a Join on 2*Scan, and crashes when it encounters something that is not a table that can be readily scanned. Also, there are shenanigans on trying to find the right schema in the Catalog. But I am blocked now, and not accustomed to the Flink internal code (would like to though, if Alibaba/Ververica are recruiting remote workers, wink wink, nudge nudge).
All opinions very much welcomed on all Options and Remarks!

Cheers, and a happy new year to all,
Benoît

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#processing-time-temporal-joins

[3] https://issues.apache.org/jira/browse/FLINK-15112

[4] https://issues.apache.org/jira/browse/FLINK-14200

[5] https://arxiv.org/pdf/1905.12133.pdf
Reply | Threaded
Open this post in threaded view
|

Re: Controlling the Materialization of JOIN updates

Benoît Paris-2
Hi Kurt,

Thank you for your answer.

Yes both fact tables and dimension tables are changing over time; it was to illustrate that they could change at the same time but that we could still make a JOIN basically ignore updates from one specified side. The SQL is not the actual one I'm using, and as you have said later on, I indeed don't deal with a time attribute and just want what's in the table at processing time.

At the moment my problem seems to be in good way of being resolved, and it is going to be Option 4: "LATERAL TABLE table_function" on the Blink planner; as Jark Wu seems to be -elegantly- providing a patch for the FLINK-14200 NPE bug:
It was indeed about shenanigans on finding the proper RelOptSchema;  Ah, I wish I had dived sooner in the source code, and I could have had the pleasure opportunity to contribute to the Flink codebase.

Anyway, shout out to Jark for resolving the bug and providing a patch! I believe this will be a real enabler for CQRS architectures on Flink (we had subscriptions with regular joins, and this patch enables querying the same thing with very minor SQL modifications)

Kind regards
Benoît


On Sat, Jan 4, 2020 at 4:22 AM Kurt Young <[hidden email]> wrote:
Hi Benoît, 

Before discussing all the options you listed, I'd like understand more about your requirements.

The part I don't fully understand is, both your fact (Event) and dimension (DimensionAtJoinTimeX) tables are
coming from the same table, Event or EventRawInput in your case. So it will result that both your fact and
dimension tables are changing with time. 

My understanding is, when your DimensionAtJoinTimeX table emit the results, you don't want to change the 
result again. You want the fact table only join whatever data currently the dimension table have? I'm asking 
because your dimension table was calculated with a window aggregation, but your join logic seems doesn't 
care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid = d1.uid). It's possible that
when a record with uid=x comes from Event table, but the dimension table doesn't have any data around 
uid=x yet due to the window aggregation. In this case, you don't want them to join? 

Best,
Kurt


On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <[hidden email]> wrote:
Hello all!

I'm trying to design a stream pipeline, and have trouble controlling when a JOIN is triggering an update:

Setup:
  • The Event table; "probe side", "query side", the result of earlier stream processing
  • The DimensionAtJoinTimeX tables; of updating nature, "build side", the results of earlier stream processing
Joining them:

SELECT    * 
FROM      Event e 
LEFT JOIN DimensionAtJoinTime1 d1 
  ON      e.uid = d1.uid 
LEFT JOIN DimensionAtJoinTime2 d2 
  ON      e.uid = d2.uid  

The DimensionAtJoinTimeX Tables being the result of earlier stream processing, possibly from the same Event table:

SELECT   uid, 
         hop_start(...), 
         sum(...)
FROM     Event e
GROUP BY uid, 
         hop(...)  

The Event Table being:

SELECT ...
FROM   EventRawInput i
WHERE  i.some_field = 'some_value'  

Requirements:
  • I need the JOINs to only be executed once, only when a new line is appended to the probe / query / Event table.
  • I also need the full pipeline to be defined in SQL.
  • I very strongly prefer the Blink planner (mainly for Deduplication, TopN and LAST_VALUE features).
Problem exploration so far:
  • Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution in SQL: it doesn't work out. But I might explore the following: insert DimensionAtJoinTimeX into a special Sink, and use it in a LookupableTableSource (I'm at a loss on how to do that, though. Do I need an external kv store?).
  • Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have missed something in the documentation.
  • Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner: It does not work with two tables [3], and I don't get to have the Blink planner features.
  • Option 4, "LATERAL TABLE table_function" [2], on the Blink planner: It does not work with the "probe side" being the results of earlier stream processing [4].
  • Option 5, let a regular JOIN materialize the updates, and somehow find how to filter the ones coming from the build sides (I'm at a loss on how to do that).
  • Option 6, "TVR": I read this paper [5], which mentions "Time-Varying Relation"s; Speculating here: could there be a way, to say that the build side is not a TVR. Aka declare the stream as being somehow "static", while still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
  • Option 7: Is there some features being developed, or hints, or workarounds to control the JOIN updates that I have not considered so far?
  • Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same bug nature, even though they occur in different situations on different planners (same Exception Stack Trace on files that have the same historical parent before the Blink fork). FLINK-15112 has a workaround, but FLINK-14200 does not. The existence of that workaround IMHO signals that there is a simple fix for both bugs. I have tried to find it in Flink for a few days, but no success so far. If you guys have pointers helping me provide a fix, I'll gladly listen. So far I have progressed to: It revolves around Calcite-based Flink streaming rules transforming a temporal table function correlate into a Join on 2*Scan, and crashes when it encounters something that is not a table that can be readily scanned. Also, there are shenanigans on trying to find the right schema in the Catalog. But I am blocked now, and not accustomed to the Flink internal code (would like to though, if Alibaba/Ververica are recruiting remote workers, wink wink, nudge nudge).
All opinions very much welcomed on all Options and Remarks!

Cheers, and a happy new year to all,
Benoît

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#processing-time-temporal-joins

[3] https://issues.apache.org/jira/browse/FLINK-15112

[4] https://issues.apache.org/jira/browse/FLINK-14200

[5] https://arxiv.org/pdf/1905.12133.pdf


--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml
Reply | Threaded
Open this post in threaded view
|

Re: Controlling the Materialization of JOIN updates

Kurt Young
Good to hear that the patch resolved your issue, looking forward to hearing more feedback from you!

Best,
Kurt


On Mon, Jan 6, 2020 at 5:56 AM Benoît Paris <[hidden email]> wrote:
Hi Kurt,

Thank you for your answer.

Yes both fact tables and dimension tables are changing over time; it was to illustrate that they could change at the same time but that we could still make a JOIN basically ignore updates from one specified side. The SQL is not the actual one I'm using, and as you have said later on, I indeed don't deal with a time attribute and just want what's in the table at processing time.

At the moment my problem seems to be in good way of being resolved, and it is going to be Option 4: "LATERAL TABLE table_function" on the Blink planner; as Jark Wu seems to be -elegantly- providing a patch for the FLINK-14200 NPE bug:
It was indeed about shenanigans on finding the proper RelOptSchema;  Ah, I wish I had dived sooner in the source code, and I could have had the pleasure opportunity to contribute to the Flink codebase.

Anyway, shout out to Jark for resolving the bug and providing a patch! I believe this will be a real enabler for CQRS architectures on Flink (we had subscriptions with regular joins, and this patch enables querying the same thing with very minor SQL modifications)

Kind regards
Benoît


On Sat, Jan 4, 2020 at 4:22 AM Kurt Young <[hidden email]> wrote:
Hi Benoît, 

Before discussing all the options you listed, I'd like understand more about your requirements.

The part I don't fully understand is, both your fact (Event) and dimension (DimensionAtJoinTimeX) tables are
coming from the same table, Event or EventRawInput in your case. So it will result that both your fact and
dimension tables are changing with time. 

My understanding is, when your DimensionAtJoinTimeX table emit the results, you don't want to change the 
result again. You want the fact table only join whatever data currently the dimension table have? I'm asking 
because your dimension table was calculated with a window aggregation, but your join logic seems doesn't 
care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid = d1.uid). It's possible that
when a record with uid=x comes from Event table, but the dimension table doesn't have any data around 
uid=x yet due to the window aggregation. In this case, you don't want them to join? 

Best,
Kurt


On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <[hidden email]> wrote:
Hello all!

I'm trying to design a stream pipeline, and have trouble controlling when a JOIN is triggering an update:

Setup:
  • The Event table; "probe side", "query side", the result of earlier stream processing
  • The DimensionAtJoinTimeX tables; of updating nature, "build side", the results of earlier stream processing
Joining them:

SELECT    * 
FROM      Event e 
LEFT JOIN DimensionAtJoinTime1 d1 
  ON      e.uid = d1.uid 
LEFT JOIN DimensionAtJoinTime2 d2 
  ON      e.uid = d2.uid  

The DimensionAtJoinTimeX Tables being the result of earlier stream processing, possibly from the same Event table:

SELECT   uid, 
         hop_start(...), 
         sum(...)
FROM     Event e
GROUP BY uid, 
         hop(...)  

The Event Table being:

SELECT ...
FROM   EventRawInput i
WHERE  i.some_field = 'some_value'  

Requirements:
  • I need the JOINs to only be executed once, only when a new line is appended to the probe / query / Event table.
  • I also need the full pipeline to be defined in SQL.
  • I very strongly prefer the Blink planner (mainly for Deduplication, TopN and LAST_VALUE features).
Problem exploration so far:
  • Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution in SQL: it doesn't work out. But I might explore the following: insert DimensionAtJoinTimeX into a special Sink, and use it in a LookupableTableSource (I'm at a loss on how to do that, though. Do I need an external kv store?).
  • Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have missed something in the documentation.
  • Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner: It does not work with two tables [3], and I don't get to have the Blink planner features.
  • Option 4, "LATERAL TABLE table_function" [2], on the Blink planner: It does not work with the "probe side" being the results of earlier stream processing [4].
  • Option 5, let a regular JOIN materialize the updates, and somehow find how to filter the ones coming from the build sides (I'm at a loss on how to do that).
  • Option 6, "TVR": I read this paper [5], which mentions "Time-Varying Relation"s; Speculating here: could there be a way, to say that the build side is not a TVR. Aka declare the stream as being somehow "static", while still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
  • Option 7: Is there some features being developed, or hints, or workarounds to control the JOIN updates that I have not considered so far?
  • Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same bug nature, even though they occur in different situations on different planners (same Exception Stack Trace on files that have the same historical parent before the Blink fork). FLINK-15112 has a workaround, but FLINK-14200 does not. The existence of that workaround IMHO signals that there is a simple fix for both bugs. I have tried to find it in Flink for a few days, but no success so far. If you guys have pointers helping me provide a fix, I'll gladly listen. So far I have progressed to: It revolves around Calcite-based Flink streaming rules transforming a temporal table function correlate into a Join on 2*Scan, and crashes when it encounters something that is not a table that can be readily scanned. Also, there are shenanigans on trying to find the right schema in the Catalog. But I am blocked now, and not accustomed to the Flink internal code (would like to though, if Alibaba/Ververica are recruiting remote workers, wink wink, nudge nudge).
All opinions very much welcomed on all Options and Remarks!

Cheers, and a happy new year to all,
Benoît

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#processing-time-temporal-joins

[3] https://issues.apache.org/jira/browse/FLINK-15112

[4] https://issues.apache.org/jira/browse/FLINK-14200

[5] https://arxiv.org/pdf/1905.12133.pdf


--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml