SQL materialized upsert tables

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

SQL materialized upsert tables

Elias Levy
I noticed that has been significant work on the SQL / Table subsystem and decided to evaluate it for one of our use cases.  The use case requires the joining of two streams, which can be considered a stream of table upserts.  Critically, when joining the streams, we only want to join against the latest value per key in one of the tables/streams.

Simply performing a join between the stream/tables is not sufficient, as it will generate result of records other than the latest one.  E.g. if you have two steam/tables with schema:

Telemetry [
  tstamp: Long
  item: String
  score: Int
  source: String
]

Scores [
  tstamp: Long
  item: String
  score: Int
]

tableEnv.sqlQuery("""
SELECT s.tstamp, s.item, s.score, t.source
FROM    Telemetry t INNER JOIN Scores s ON s.item = t.item
WHERE s.score <> t.score AND s.tstamp >= t.tstamp
""")

If the stream receives 3 records from the telemetry stream for the same source and then a record that matches the item from the score stream that updates the score, it will generate three output records, even though we only want the latest record from the source to be considered.

If this were a regular database we could do the following to only get the latest records with the telemetry table:

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
    SELECT MAX(tstamp), item, source
    FROM Telemetry
    GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source
""")

and then execute the previous query against this LatestTelemetry table instead of Telemetry.  But that does not work.  The query executed within Flink, but still outputs multiple records, regardless of the order the records come into the source streams.

I am wondering if there is a way to accomplish this within Flink's SQL/Table abstractions.

Kafka Streams has the concept of a KTable, where records are considered upserts and update previous records that have the same key.  Thus, when you join against a KTable, you only join against the latest record for a given key, rather than all previous records for the key.

Thoughts?


Reply | Threaded
Open this post in threaded view
|

Re: SQL materialized upsert tables

Elias Levy
[ Adding the list back in, as this clarifies my question ]

On Tue, Feb 20, 2018 at 3:42 PM, Darshan Singh <[hidden email]> wrote:
I am no expert in Flink but I will try my best. Issue you mentioned will be with all streaming systems even with Kafka KTable I use them a lot for similar sort of requirements.

In Kafka you have KTable on Telemetry with 3 records and join with say Scores which could be KTable or Kstrem  and you start your streaming query as mentioned above it will give just 1 row as expected. However, if there is a new value for the same key with timestamp greater than previous max  will be added to the Telemetry it will output the new value as well and that is main idea about the streaming anyway you want to see the changed value. So once you started streaming you will get whatever is the outcome of your

Darshan,

Thanks for the reply.  I've already implemented this job using Kafka Streams, so I am aware of how KTables behaves.  I would have helped if I had included some sample data in my post, so here it is.  If you have this data coming into Telemetry:

ts, item, score, source
0, item1, 1, source1
1, item1, 1, source1
2, item1, 1, source1

And this comes into Scores:

ts, item, score
3, item1, 3

Flink will output 3 records from the queries I mentioned:

(3, item1, 3, source1)
(3, item1, 3, source1)
(3, item1, 3, source1)


In contrast, if you run the query in Kafka Stream configuring Telemetry as a KTable keyed by (item, source), the output will be a single record.  In Telemetry record for key (item1, source1) at time 1 will overwrite the record at time 0, and the record at time 2 will overwrite the one at time 1.  By the time the record at time 3 comes in via Scores, it will be joined only with the record from time 2 in Telemetry.

Yes, it is possible for the Kafka Streams query to output multiple records if the records from the different streams are not time aligned, as Kafka Streams only guarantees a best effort aligning the streams. But in the common case the output will be a single record.


I think in fllink you can do the same, from your telemeter stream/table you can create the LatestTelemetry table using similar sql(I am sure it should give you latest timestamp's data) as you did with the RDBMS and then join with scores table. You should get similar results to KTable or any other streaming system.

Not sure if you missed it, but I actually executed the query to define the LatestTelemetry table in Flink using that query and joined against it.  The output was the same three records.

Reply | Threaded
Open this post in threaded view
|

Re: SQL materialized upsert tables

Fabian Hueske-2
Hi Elias,

Flink does not have built-in support for upsert stream -> table conversions, yet. However, the community is working on that (see FLINK-8545 [1]).
With a workaround, you can also solve the issue with what Flink supports so far.

The approach with the MAX(tstamp) query was good idea, but the query needs another join predicate on time.

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
    SELECT MAX(tstamp) AS maxT, item, source
    FROM Telemetry
    GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
""")

Otherwise, the table will have multiple records for each combination of item and score as you noticed.

HOWEVER, you might not want to use the query above because it will accumulate all records from Telemetry in state and never clean them up.
The reason for this is that the query planner is not smart enough yet to infer that old records will never be joined (this is implied by the join condition on time).

A better solution is to use a custom user-defined aggregation function [2] (LAST_VAL) that returns the value with associated max timestamp.

SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
FROM Telemetry
GROUP BY item, source

LAST_VAL would have an accumulator that stores a score and its associated timestamp.
When a new (score, timestamp) pair is accumulated, the UDAGG compares the timestamps and only updates the accumulator if the new timestamp is larger.

Btw. I'm not sure if KStreams only updates the KTable if the update has a higher timestamp or just take the last received record.
That might be an issue with out-of-order data. I would check the behavior if you expect data with out-of-order timestamps.

The upsert stream table conversion that we are working on will support event time (max timestamp) or processing time (last value) upserts.

Best, Fabian

2018-02-21 1:06 GMT+01:00 Elias Levy <[hidden email]>:
[ Adding the list back in, as this clarifies my question ]

On Tue, Feb 20, 2018 at 3:42 PM, Darshan Singh <[hidden email]> wrote:
I am no expert in Flink but I will try my best. Issue you mentioned will be with all streaming systems even with Kafka KTable I use them a lot for similar sort of requirements.

In Kafka you have KTable on Telemetry with 3 records and join with say Scores which could be KTable or Kstrem  and you start your streaming query as mentioned above it will give just 1 row as expected. However, if there is a new value for the same key with timestamp greater than previous max  will be added to the Telemetry it will output the new value as well and that is main idea about the streaming anyway you want to see the changed value. So once you started streaming you will get whatever is the outcome of your

Darshan,

Thanks for the reply.  I've already implemented this job using Kafka Streams, so I am aware of how KTables behaves.  I would have helped if I had included some sample data in my post, so here it is.  If you have this data coming into Telemetry:

ts, item, score, source
0, item1, 1, source1
1, item1, 1, source1
2, item1, 1, source1

And this comes into Scores:

ts, item, score
3, item1, 3

Flink will output 3 records from the queries I mentioned:

(3, item1, 3, source1)
(3, item1, 3, source1)
(3, item1, 3, source1)


In contrast, if you run the query in Kafka Stream configuring Telemetry as a KTable keyed by (item, source), the output will be a single record.  In Telemetry record for key (item1, source1) at time 1 will overwrite the record at time 0, and the record at time 2 will overwrite the one at time 1.  By the time the record at time 3 comes in via Scores, it will be joined only with the record from time 2 in Telemetry.

Yes, it is possible for the Kafka Streams query to output multiple records if the records from the different streams are not time aligned, as Kafka Streams only guarantees a best effort aligning the streams. But in the common case the output will be a single record.


I think in fllink you can do the same, from your telemeter stream/table you can create the LatestTelemetry table using similar sql(I am sure it should give you latest timestamp's data) as you did with the RDBMS and then join with scores table. You should get similar results to KTable or any other streaming system.

Not sure if you missed it, but I actually executed the query to define the LatestTelemetry table in Flink using that query and joined against it.  The output was the same three records.


Reply | Threaded
Open this post in threaded view
|

Re: SQL materialized upsert tables

Elias Levy
On Wed, Feb 21, 2018 at 3:24 AM, Fabian Hueske <[hidden email]> wrote:
Hi Elias,

Flink does not have built-in support for upsert stream -> table conversions, yet. However, the community is working on that (see FLINK-8545 [1]).
With a workaround, you can also solve the issue with what Flink supports so far.

Fabian,

Thanks for the reply.  Great to see some progress on this area.  If we could implement this job in Flink rather than Kafka Stream it would mean one less technology to support and to train our developers on, which is always a plus.

 
The approach with the MAX(tstamp) query was good idea, but the query needs another join predicate on time.

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
    SELECT MAX(tstamp) AS maxT, item, source
    FROM Telemetry
    GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
""")

Otherwise, the table will have multiple records for each combination of item and score as you noticed.

HOWEVER, you might not want to use the query above because it will accumulate all records from Telemetry in state and never clean them up.
The reason for this is that the query planner is not smart enough yet to infer that old records will never be joined (this is implied by the join condition on time).

Thanks for the correction.  But, yes, the indefinite accumulation is a deal breakers for using this approach.
 

A better solution is to use a custom user-defined aggregation function [2] (LAST_VAL) that returns the value with associated max timestamp.

SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
FROM Telemetry
GROUP BY item, source

LAST_VAL would have an accumulator that stores a score and its associated timestamp.
When a new (score, timestamp) pair is accumulated, the UDAGG compares the timestamps and only updates the accumulator if the new timestamp is larger.

I'll give this approach a try.
 

Btw. I'm not sure if KStreams only updates the KTable if the update has a higher timestamp or just take the last received record.
That might be an issue with out-of-order data. I would check the behavior if you expect data with out-of-order timestamps.

I believe you are correct.  KS will attempt to consume records from across partitions by attempting to align their timestamps, but it won't reorder records within a partition, which can be problematic if you can't guarantee ordered records within a partition.  While I talked about KTables, in reality the job I wrote is a combination of the KS Stream DSL and Operator API, to get around some of these issues.

The upsert stream table conversion that we are working on will support event time (max timestamp) or processing time (last value) upserts.

Excellent.
 

Reply | Threaded
Open this post in threaded view
|

Re: SQL materialized upsert tables

Fabian Hueske-2
Hi Elias,

it would be great if you could let us know if the approach works.

Btw. I should point out that the join in a query like:

SELECT s.tstamp, s.item, s.score, t.source
FROM    (
SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
FROM Telemetry
GROUP BY item, source
)
INNER JOIN Scores s ON s.item = t.item
WHERE s.score <> t.score

is a full-history join and will fully materialize both inputs, the upserted Telemetry table and the append-only Scores table.
The query would hold a left join state with one row per item-store combination in Telemetry, and a right join state for each row of Scores.
Moreover, each update on the Telemetry table will change the output for all rows of Scores that are affected by the update.
You can configure a state retention time. This will clean up state per key (in case of the join above based on the equi-join attribute) if a key did not receive new data within the retention time.

The typical use case for full-history joins is to join two upserted or GROUP-BY-aggregated tables, i.e,. tables that are updated but remain more or less constant in size.

Best, Fabian

2018-02-21 20:00 GMT+01:00 Elias Levy <[hidden email]>:
On Wed, Feb 21, 2018 at 3:24 AM, Fabian Hueske <[hidden email]> wrote:
Hi Elias,

Flink does not have built-in support for upsert stream -> table conversions, yet. However, the community is working on that (see FLINK-8545 [1]).
With a workaround, you can also solve the issue with what Flink supports so far.

Fabian,

Thanks for the reply.  Great to see some progress on this area.  If we could implement this job in Flink rather than Kafka Stream it would mean one less technology to support and to train our developers on, which is always a plus.

 
The approach with the MAX(tstamp) query was good idea, but the query needs another join predicate on time.

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
    SELECT MAX(tstamp) AS maxT, item, source
    FROM Telemetry
    GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
""")

Otherwise, the table will have multiple records for each combination of item and score as you noticed.

HOWEVER, you might not want to use the query above because it will accumulate all records from Telemetry in state and never clean them up.
The reason for this is that the query planner is not smart enough yet to infer that old records will never be joined (this is implied by the join condition on time).

Thanks for the correction.  But, yes, the indefinite accumulation is a deal breakers for using this approach.
 

A better solution is to use a custom user-defined aggregation function [2] (LAST_VAL) that returns the value with associated max timestamp.

SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
FROM Telemetry
GROUP BY item, source

LAST_VAL would have an accumulator that stores a score and its associated timestamp.
When a new (score, timestamp) pair is accumulated, the UDAGG compares the timestamps and only updates the accumulator if the new timestamp is larger.

I'll give this approach a try.
 

Btw. I'm not sure if KStreams only updates the KTable if the update has a higher timestamp or just take the last received record.
That might be an issue with out-of-order data. I would check the behavior if you expect data with out-of-order timestamps.

I believe you are correct.  KS will attempt to consume records from across partitions by attempting to align their timestamps, but it won't reorder records within a partition, which can be problematic if you can't guarantee ordered records within a partition.  While I talked about KTables, in reality the job I wrote is a combination of the KS Stream DSL and Operator API, to get around some of these issues.

The upsert stream table conversion that we are working on will support event time (max timestamp) or processing time (last value) upserts.

Excellent.