I noticed that has been significant work on the SQL / Table subsystem and decided to evaluate it for one of our use cases. The use case requires the joining of two streams, which can be considered a stream of table upserts. Critically, when joining the streams, we only want to join against the latest value per key in one of the tables/streams.
Simply performing a join between the stream/tables is not sufficient, as it will generate result of records other than the latest one. E.g. if you have two steam/tables with schema: Telemetry [ tstamp: Long item: String score: Int source: String ] Scores [ tstamp: Long item: String score: Int ] tableEnv.sqlQuery(""" SELECT s.tstamp, s.item, s.score, t.source FROM Telemetry t INNER JOIN Scores s ON s.item = t.item WHERE s.score <> t.score AND s.tstamp >= t.tstamp """) If the stream receives 3 records from the telemetry stream for the same source and then a record that matches the item from the score stream that updates the score, it will generate three output records, even though we only want the latest record from the source to be considered. If this were a regular database we could do the following to only get the latest records with the telemetry table: tableEnv.sqlQuery(""" SELECT a.tstamp, a.item, a.score, a.source FROM Telemetry a INNER JOIN ( SELECT MAX(tstamp), item, source FROM Telemetry GROUP BY item, source ) b ON a.item = b.item AND a.source = b.source """) and then execute the previous query against this LatestTelemetry table instead of Telemetry. But that does not work. The query executed within Flink, but still outputs multiple records, regardless of the order the records come into the source streams. I am wondering if there is a way to accomplish this within Flink's SQL/Table abstractions. Kafka Streams has the concept of a KTable, where records are considered upserts and update previous records that have the same key. Thus, when you join against a KTable, you only join against the latest record for a given key, rather than all previous records for the key. Thoughts? |
[ Adding the list back in, as this clarifies my question ]
Darshan, Thanks for the reply. I've already implemented this job using Kafka Streams, so I am aware of how KTables behaves. I would have helped if I had included some sample data in my post, so here it is. If you have this data coming into Telemetry: ts, item, score, source 0, item1, 1, source1 1, item1, 1, source1 2, item1, 1, source1 And this comes into Scores: ts, item, score 3, item1, 3 Flink will output 3 records from the queries I mentioned: (3, item1, 3, source1) (3, item1, 3, source1) (3, item1, 3, source1) In contrast, if you run the query in Kafka Stream configuring Telemetry as a KTable keyed by (item, source), the output will be a single record. In Telemetry record for key (item1, source1) at time 1 will overwrite the record at time 0, and the record at time 2 will overwrite the one at time 1. By the time the record at time 3 comes in via Scores, it will be joined only with the record from time 2 in Telemetry. Yes, it is possible for the Kafka Streams query to output multiple records if the records from the different streams are not time aligned, as Kafka Streams only guarantees a best effort aligning the streams. But in the common case the output will be a single record.
Not sure if you missed it, but I actually executed the query to define the LatestTelemetry table in Flink using that query and joined against it. The output was the same three records. |
Hi Elias, Flink does not have built-in support for upsert stream -> table conversions, yet. However, the community is working on that (see FLINK-8545 [1]).With a workaround, you can also solve the issue with what Flink supports so far. The approach with the MAX(tstamp) query was good idea, but the query needs another join predicate on time. tableEnv.sqlQuery(""" SELECT a.tstamp, a.item, a.score, a.source FROM Telemetry a INNER JOIN ( SELECT MAX(tstamp) AS maxT, item, source FROM Telemetry GROUP BY item, source ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT """) Otherwise, the table will have multiple records for each combination of item and score as you noticed. HOWEVER, you might not want to use the query above because it will accumulate all records from Telemetry in state and never clean them up. The reason for this is that the query planner is not smart enough yet to infer that old records will never be joined (this is implied by the join condition on time). A better solution is to use a custom user-defined aggregation function [2] (LAST_VAL) that returns the value with associated max timestamp. SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp) FROM Telemetry GROUP BY item, source LAST_VAL would have an accumulator that stores a score and its associated timestamp. When a new (score, timestamp) pair is accumulated, the UDAGG compares the timestamps and only updates the accumulator if the new timestamp is larger. Btw. I'm not sure if KStreams only updates the KTable if the update has a higher timestamp or just take the last received record. That might be an issue with out-of-order data. I would check the behavior if you expect data with out-of-order timestamps. The upsert stream table conversion that we are working on will support event time (max timestamp) or processing time (last value) upserts. Best, Fabian 2018-02-21 1:06 GMT+01:00 Elias Levy <[hidden email]>:
|
On Wed, Feb 21, 2018 at 3:24 AM, Fabian Hueske <[hidden email]> wrote:
Fabian, Thanks for the reply. Great to see some progress on this area. If we could implement this job in Flink rather than Kafka Stream it would mean one less technology to support and to train our developers on, which is always a plus.
Thanks for the correction. But, yes, the indefinite accumulation is a deal breakers for using this approach.
I'll give this approach a try.
I believe you are correct. KS will attempt to consume records from across partitions by attempting to align their timestamps, but it won't reorder records within a partition, which can be problematic if you can't guarantee ordered records within a partition. While I talked about KTables, in reality the job I wrote is a combination of the KS Stream DSL and Operator API, to get around some of these issues.
Excellent. |
Hi Elias, Btw. I should point out that the join in a query like:it would be great if you could let us know if the approach works. SELECT s.tstamp, s.item, s.score, t.source FROM ( SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp) FROM Telemetry GROUP BY item, source ) INNER JOIN Scores s ON s.item = t.itemWHERE s.score <> t.score Moreover, each update on the Telemetry table will change the output for all rows of Scores that are affected by the update. You can configure a state retention time. This will clean up state per key (in case of the join above based on the equi-join attribute) if a key did not receive new data within the retention time.The typical use case for full-history joins is to join two upserted or GROUP-BY-aggregated tables, i.e,. tables that are updated but remain more or less constant in size. 2018-02-21 20:00 GMT+01:00 Elias Levy <[hidden email]>:
|
Free forum by Nabble | Edit this page |