From Kafka Stream to Flink

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

From Kafka Stream to Flink

Maatary Okouya
Hi, 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 


Reply | Threaded
Open this post in threaded view
|

Re: From Kafka Stream to Flink

miki haiat
Can you elaborate more  about your use case .


On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:
Hi, 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 


Reply | Threaded
Open this post in threaded view
|

Re: From Kafka Stream to Flink

Maatary Okouya
I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:
Can you elaborate more  about your use case .


On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:
Hi, 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 


Reply | Threaded
Open this post in threaded view
|

Re: From Kafka Stream to Flink

Fabian Hueske-2
Hi,

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.
A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.
A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).
However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
FROM appendOnlyTable
GROUP BY key

Given, this view, you can run all kinds of SQL queries on it.
However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view changes.
It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.
These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.
You can implement pretty much all logic of KStreams and more in these APIs.

Best, Fabian


Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:
I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:
Can you elaborate more  about your use case .


On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:
Hi, 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 


Reply | Threaded
Open this post in threaded view
|

Re: From Kafka Stream to Flink

Maatary Okouya
Thank you for the clarification. Really appreciated. 

Is Last_val part of the API ?

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.
A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.
A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).
However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
FROM appendOnlyTable
GROUP BY key

Given, this view, you can run all kinds of SQL queries on it.
However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view changes.
It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.
These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.
You can implement pretty much all logic of KStreams and more in these APIs.

Best, Fabian


Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:
I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:
Can you elaborate more  about your use case .


On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:
Hi, 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 


Reply | Threaded
Open this post in threaded view
|

Re: From Kafka Stream to Flink

Maatary Okouya
Fabian, 

could you please clarify the following statement: 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view changes.
It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.


I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?


On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:
Thank you for the clarification. Really appreciated. 

Is Last_val part of the API ?

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.
A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.
A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).
However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
FROM appendOnlyTable
GROUP BY key

Given, this view, you can run all kinds of SQL queries on it.
However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view changes.
It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.
These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.
You can implement pretty much all logic of KStreams and more in these APIs.

Best, Fabian


Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:
I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:
Can you elaborate more  about your use case .


On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:
Hi, 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 


Reply | Threaded
Open this post in threaded view
|

Re: From Kafka Stream to Flink

Maatary Okouya
Fabian, 

ultimately, i just want to perform a join on the last values for each keys. 

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <[hidden email]> wrote:
Fabian, 

could you please clarify the following statement: 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view changes.
It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.


I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?


On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:
Thank you for the clarification. Really appreciated. 

Is Last_val part of the API ?

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.
A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.
A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).
However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
FROM appendOnlyTable
GROUP BY key

Given, this view, you can run all kinds of SQL queries on it.
However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view changes.
It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.
These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.
You can implement pretty much all logic of KStreams and more in these APIs.

Best, Fabian


Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:
I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:
Can you elaborate more  about your use case .


On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:
Hi, 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 


Reply | Threaded
Open this post in threaded view
|

Re: From Kafka Stream to Flink

Fabian Hueske-2
Hi,

LAST_VAL is not a built-in function, so you'd need to implement it as a user-defined aggregate function (UDAGG) and register it.

The problem with joining an append only table with an updating table is the following.

Consider two tables: users (uid, name, zip) and orders (oid, uid, product), with user being an updating table and orders being append only.

On January 1st, the tables look like this:

Users:
uid_1, Fred, 12345
uid_2, Mary, 67890

Orders
oid_1, uid_1, Popcorn
oid_2, uid_2, Carrots

Joining both tables with the following query SELECT oid, product, name, zip FROM users u, orders o WHERE u.uid = o.uid results in:

oid_1, Popcorn, Fred, 12345
oid_2, Carrots, Mary, 67890

Whenever, a new order is appended, we look up the corresponding user data, perform the join and emit the results.
Let's say on July 1st we have received 100 orders from our two users all is fine. However, on July 2nd Fred updates his zip code because he moved to another city.
Our data now looks like this:

Users:
uid_1, Fred, 24680
uid_2, Mary, 67890

Orders
oid_1, uid_1, Popcorn
oid_2, uid_2, Carrots
....
oid_100, uid_2, Potatoes

The result of the same query as before is:

oid_1, Popcorn, Fred, 24680
oid_2, Carrots, Mary, 67890
....
oid_100, Potatoes, Mary, 67890

Notice how the first row changed?
If we strictly follow SQL semantics (which we do in Flink SQL) the query needs to update the ZIP code of the first result row.
In order to do so, we need access to the original data of the orders table, which is the append only table in our scenario.
Consequently, we need to fully materialize append only tables when they are joined with an updating table without temporal constraints.

In many situations, the indented semantics for such a query would be to join the order with the ZIP code of the user *that was valid at the time when the order was placed*.
However, this is *not* semantics of the query of our example. For such a query, we need to model the data differently. The users table needs to store all modifications, i.e., the full history of all updates.
Each update needs a timestamp and each order needs a timestamp as well. With these timestamps, we can write a query that joins an order with the user data that we valid at the time when the order was placed.
This is the temporal constraint that I mentioned before. With this constraint, Flink can use the information about progressing time to reason about how much state it needs to keep because a change of the user table will only affect future orders.

Flink makes this a lot easier with the concept of temporal tables [1] and temporal table joins [2].

Best,
Fabian



Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <[hidden email]>:
Fabian, 

ultimately, i just want to perform a join on the last values for each keys. 

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <[hidden email]> wrote:
Fabian, 

could you please clarify the following statement: 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view changes.
It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.


I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?


On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:
Thank you for the clarification. Really appreciated. 

Is Last_val part of the API ?

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.
A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.
A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).
However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
FROM appendOnlyTable
GROUP BY key

Given, this view, you can run all kinds of SQL queries on it.
However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view changes.
It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.
These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.
You can implement pretty much all logic of KStreams and more in these APIs.

Best, Fabian


Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:
I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:
Can you elaborate more  about your use case .


On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:
Hi, 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 


Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: From Kafka Stream to Flink

ruben.casado.tejedor

Hi

 

Do you have an expected version of Flink to include the capability to ingest an upsert stream as a dynamic table? We have such need in our current project. What we have done is to emulate such behavior working at low level with states (e.g. update existing value if key exists, create a new value if key does not exist). But we cannot use SQL that would help as to do it faster.

 

Our use case is many small flink jobs that have to something like:

 

SELECT some fields

FROM t1 INNER JOIN t1 on t1.id = t2.id (maybe join +3 tables)

WHERE some conditions on fields;

 

We need the result of that queries taking into account only the last values of each row. The result is inserted/updated in a in-memory K-V database for fast access.

 

Thanks in advance!

 

Best

 

De: Fabian Hueske <[hidden email]>
Fecha: miércoles, 7 de agosto de 2019, 11:08
Para: Maatary Okouya <[hidden email]>
CC: miki haiat <[hidden email]>, user <[hidden email]>
Asunto: [External] Re: From Kafka Stream to Flink

 

This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.


 

Hi,

 

LAST_VAL is not a built-in function, so you'd need to implement it as a user-defined aggregate function (UDAGG) and register it.

 

The problem with joining an append only table with an updating table is the following.

 

Consider two tables: users (uid, name, zip) and orders (oid, uid, product), with user being an updating table and orders being append only.

 

On January 1st, the tables look like this:

 

Users:

uid_1, Fred, 12345

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

 

Joining both tables with the following query SELECT oid, product, name, zip FROM users u, orders o WHERE u.uid = o.uid results in:

 

oid_1, Popcorn, Fred, 12345

oid_2, Carrots, Mary, 67890

 

Whenever, a new order is appended, we look up the corresponding user data, perform the join and emit the results.

Let's say on July 1st we have received 100 orders from our two users all is fine. However, on July 2nd Fred updates his zip code because he moved to another city.

Our data now looks like this:

 

Users:

uid_1, Fred, 24680

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

....

oid_100, uid_2, Potatoes

 

The result of the same query as before is:

 

oid_1, Popcorn, Fred, 24680

oid_2, Carrots, Mary, 67890

....

oid_100, Potatoes, Mary, 67890

 

Notice how the first row changed?

If we strictly follow SQL semantics (which we do in Flink SQL) the query needs to update the ZIP code of the first result row.

In order to do so, we need access to the original data of the orders table, which is the append only table in our scenario.

Consequently, we need to fully materialize append only tables when they are joined with an updating table without temporal constraints.

 

In many situations, the indented semantics for such a query would be to join the order with the ZIP code of the user *that was valid at the time when the order was placed*.

However, this is *not* semantics of the query of our example. For such a query, we need to model the data differently. The users table needs to store all modifications, i.e., the full history of all updates.

Each update needs a timestamp and each order needs a timestamp as well. With these timestamps, we can write a query that joins an order with the user data that we valid at the time when the order was placed.

This is the temporal constraint that I mentioned before. With this constraint, Flink can use the information about progressing time to reason about how much state it needs to keep because a change of the user table will only affect future orders.

 

Flink makes this a lot easier with the concept of temporal tables [1] and temporal table joins [2].

 

Best,

Fabian

 

 

 

Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <[hidden email]>:

Fabian, 

 

ultimately, i just want to perform a join on the last values for each keys. 

 

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <[hidden email]> wrote:

Fabian, 

 

could you please clarify the following statement: 

 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

 

I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?

 

 

On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:

Thank you for the clarification. Really appreciated. 

 

Is Last_val part of the API ?

 

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:

Hi,

 

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.

A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.

A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

 

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).

However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

 

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...

FROM appendOnlyTable

GROUP BY key

 

Given, this view, you can run all kinds of SQL queries on it.

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.

These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.

You can implement pretty much all logic of KStreams and more in these APIs.

 

Best, Fabian

 

 

Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:

I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:

Can you elaborate more  about your use case .

 

On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:

Hi, 

 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

 

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 

 

 




This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com
Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: From Kafka Stream to Flink

Fabian Hueske-2
Hi Ruben,

Work on this feature has already started [1], but stalled a bit (probably due to the effort of merging the new Blink query processor).
Hequn (in CC) is the guy working on upsert table ingestion, maybe he can share what the status of this feature is.

Best, Fabian


Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <[hidden email]>:

Hi

 

Do you have an expected version of Flink to include the capability to ingest an upsert stream as a dynamic table? We have such need in our current project. What we have done is to emulate such behavior working at low level with states (e.g. update existing value if key exists, create a new value if key does not exist). But we cannot use SQL that would help as to do it faster.

 

Our use case is many small flink jobs that have to something like:

 

SELECT some fields

FROM t1 INNER JOIN t1 on t1.id = t2.id (maybe join +3 tables)

WHERE some conditions on fields;

 

We need the result of that queries taking into account only the last values of each row. The result is inserted/updated in a in-memory K-V database for fast access.

 

Thanks in advance!

 

Best

 

De: Fabian Hueske <[hidden email]>
Fecha: miércoles, 7 de agosto de 2019, 11:08
Para: Maatary Okouya <[hidden email]>
CC: miki haiat <[hidden email]>, user <[hidden email]>
Asunto: [External] Re: From Kafka Stream to Flink

 

This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.


 

Hi,

 

LAST_VAL is not a built-in function, so you'd need to implement it as a user-defined aggregate function (UDAGG) and register it.

 

The problem with joining an append only table with an updating table is the following.

 

Consider two tables: users (uid, name, zip) and orders (oid, uid, product), with user being an updating table and orders being append only.

 

On January 1st, the tables look like this:

 

Users:

uid_1, Fred, 12345

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

 

Joining both tables with the following query SELECT oid, product, name, zip FROM users u, orders o WHERE u.uid = o.uid results in:

 

oid_1, Popcorn, Fred, 12345

oid_2, Carrots, Mary, 67890

 

Whenever, a new order is appended, we look up the corresponding user data, perform the join and emit the results.

Let's say on July 1st we have received 100 orders from our two users all is fine. However, on July 2nd Fred updates his zip code because he moved to another city.

Our data now looks like this:

 

Users:

uid_1, Fred, 24680

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

....

oid_100, uid_2, Potatoes

 

The result of the same query as before is:

 

oid_1, Popcorn, Fred, 24680

oid_2, Carrots, Mary, 67890

....

oid_100, Potatoes, Mary, 67890

 

Notice how the first row changed?

If we strictly follow SQL semantics (which we do in Flink SQL) the query needs to update the ZIP code of the first result row.

In order to do so, we need access to the original data of the orders table, which is the append only table in our scenario.

Consequently, we need to fully materialize append only tables when they are joined with an updating table without temporal constraints.

 

In many situations, the indented semantics for such a query would be to join the order with the ZIP code of the user *that was valid at the time when the order was placed*.

However, this is *not* semantics of the query of our example. For such a query, we need to model the data differently. The users table needs to store all modifications, i.e., the full history of all updates.

Each update needs a timestamp and each order needs a timestamp as well. With these timestamps, we can write a query that joins an order with the user data that we valid at the time when the order was placed.

This is the temporal constraint that I mentioned before. With this constraint, Flink can use the information about progressing time to reason about how much state it needs to keep because a change of the user table will only affect future orders.

 

Flink makes this a lot easier with the concept of temporal tables [1] and temporal table joins [2].

 

Best,

Fabian

 

 

 

Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <[hidden email]>:

Fabian, 

 

ultimately, i just want to perform a join on the last values for each keys. 

 

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <[hidden email]> wrote:

Fabian, 

 

could you please clarify the following statement: 

 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

 

I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?

 

 

On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:

Thank you for the clarification. Really appreciated. 

 

Is Last_val part of the API ?

 

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:

Hi,

 

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.

A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.

A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

 

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).

However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

 

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...

FROM appendOnlyTable

GROUP BY key

 

Given, this view, you can run all kinds of SQL queries on it.

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.

These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.

You can implement pretty much all logic of KStreams and more in these APIs.

 

Best, Fabian

 

 

Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:

I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:

Can you elaborate more  about your use case .

 

On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:

Hi, 

 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

 

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 

 

 




This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com
Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: From Kafka Stream to Flink

ruben.casado.tejedor

Thanks Fabian. [hidden email] Could you share the status? Thanks for your amazing work!

 

De: Fabian Hueske <[hidden email]>
Fecha: viernes, 16 de agosto de 2019, 9:30
Para: "Casado Tejedor, Rubén" <[hidden email]>
CC: Maatary Okouya <[hidden email]>, miki haiat <[hidden email]>, user <[hidden email]>, Hequn Cheng <[hidden email]>
Asunto: Re: [External] Re: From Kafka Stream to Flink

 

Hi Ruben,

 

Work on this feature has already started [1], but stalled a bit (probably due to the effort of merging the new Blink query processor).

Hequn (in CC) is the guy working on upsert table ingestion, maybe he can share what the status of this feature is.

 

Best, Fabian

 

 

Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <[hidden email]>:

Hi

 

Do you have an expected version of Flink to include the capability to ingest an upsert stream as a dynamic table? We have such need in our current project. What we have done is to emulate such behavior working at low level with states (e.g. update existing value if key exists, create a new value if key does not exist). But we cannot use SQL that would help as to do it faster.

 

Our use case is many small flink jobs that have to something like:

 

SELECT some fields

FROM t1 INNER JOIN t1 on t1.id = t2.id (maybe join +3 tables)

WHERE some conditions on fields;

 

We need the result of that queries taking into account only the last values of each row. The result is inserted/updated in a in-memory K-V database for fast access.

 

Thanks in advance!

 

Best

 

De: Fabian Hueske <[hidden email]>
Fecha: miércoles, 7 de agosto de 2019, 11:08
Para: Maatary Okouya <[hidden email]>
CC: miki haiat <[hidden email]>, user <[hidden email]>
Asunto: [External] Re: From Kafka Stream to Flink

 

This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.


 

Hi,

 

LAST_VAL is not a built-in function, so you'd need to implement it as a user-defined aggregate function (UDAGG) and register it.

 

The problem with joining an append only table with an updating table is the following.

 

Consider two tables: users (uid, name, zip) and orders (oid, uid, product), with user being an updating table and orders being append only.

 

On January 1st, the tables look like this:

 

Users:

uid_1, Fred, 12345

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

 

Joining both tables with the following query SELECT oid, product, name, zip FROM users u, orders o WHERE u.uid = o.uid results in:

 

oid_1, Popcorn, Fred, 12345

oid_2, Carrots, Mary, 67890

 

Whenever, a new order is appended, we look up the corresponding user data, perform the join and emit the results.

Let's say on July 1st we have received 100 orders from our two users all is fine. However, on July 2nd Fred updates his zip code because he moved to another city.

Our data now looks like this:

 

Users:

uid_1, Fred, 24680

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

....

oid_100, uid_2, Potatoes

 

The result of the same query as before is:

 

oid_1, Popcorn, Fred, 24680

oid_2, Carrots, Mary, 67890

....

oid_100, Potatoes, Mary, 67890

 

Notice how the first row changed?

If we strictly follow SQL semantics (which we do in Flink SQL) the query needs to update the ZIP code of the first result row.

In order to do so, we need access to the original data of the orders table, which is the append only table in our scenario.

Consequently, we need to fully materialize append only tables when they are joined with an updating table without temporal constraints.

 

In many situations, the indented semantics for such a query would be to join the order with the ZIP code of the user *that was valid at the time when the order was placed*.

However, this is *not* semantics of the query of our example. For such a query, we need to model the data differently. The users table needs to store all modifications, i.e., the full history of all updates.

Each update needs a timestamp and each order needs a timestamp as well. With these timestamps, we can write a query that joins an order with the user data that we valid at the time when the order was placed.

This is the temporal constraint that I mentioned before. With this constraint, Flink can use the information about progressing time to reason about how much state it needs to keep because a change of the user table will only affect future orders.

 

Flink makes this a lot easier with the concept of temporal tables [1] and temporal table joins [2].

 

Best,

Fabian

 

 

 

Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <[hidden email]>:

Fabian, 

 

ultimately, i just want to perform a join on the last values for each keys. 

 

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <[hidden email]> wrote:

Fabian, 

 

could you please clarify the following statement: 

 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

 

I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?

 

 

On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:

Thank you for the clarification. Really appreciated. 

 

Is Last_val part of the API ?

 

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:

Hi,

 

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.

A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.

A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

 

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).

However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

 

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...

FROM appendOnlyTable

GROUP BY key

 

Given, this view, you can run all kinds of SQL queries on it.

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.

These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.

You can implement pretty much all logic of KStreams and more in these APIs.

 

Best, Fabian

 

 

Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:

I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:

Can you elaborate more  about your use case .

 

On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:

Hi, 

 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

 

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 

 

 

 



This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com

Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: From Kafka Stream to Flink

Hequn Cheng
Hi,

Fabian is totally right. Big thanks to the detailed answers and nice examples above.

As for the PR, very sorry about the delay. It is mainly because of the merge of blink and my work switching to Flink Python recently. 
However, I think the later version of blink would cover this feature natively with further merges. 

Before that, I think we can use the solution Fabian provided above. 

There are some examples here[1][2] which may be helpful to you @Casado @Maatary.
In [1], the test case quite matches your scenario(perform join after groupby+last_value). It also provides the udaf what you want and shows how to register it.
In [2], the test shows how to use the built-in last_value in SQL. Note that the built-in last_value UDAF is only supported in blink-planner from flink-1.9.0. If you are using the flink-planner(or version before that), you can register the last_value UDAF with the TableEnvironment like it is showed in [1].

Feel free to ask if there are other problems.

Best, Hequn

On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén <[hidden email]> wrote:

Thanks Fabian. [hidden email] Could you share the status? Thanks for your amazing work!

 

De: Fabian Hueske <[hidden email]>
Fecha: viernes, 16 de agosto de 2019, 9:30
Para: "Casado Tejedor, Rubén" <[hidden email]>
CC: Maatary Okouya <[hidden email]>, miki haiat <[hidden email]>, user <[hidden email]>, Hequn Cheng <[hidden email]>
Asunto: Re: [External] Re: From Kafka Stream to Flink

 

Hi Ruben,

 

Work on this feature has already started [1], but stalled a bit (probably due to the effort of merging the new Blink query processor).

Hequn (in CC) is the guy working on upsert table ingestion, maybe he can share what the status of this feature is.

 

Best, Fabian

 

 

Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <[hidden email]>:

Hi

 

Do you have an expected version of Flink to include the capability to ingest an upsert stream as a dynamic table? We have such need in our current project. What we have done is to emulate such behavior working at low level with states (e.g. update existing value if key exists, create a new value if key does not exist). But we cannot use SQL that would help as to do it faster.

 

Our use case is many small flink jobs that have to something like:

 

SELECT some fields

FROM t1 INNER JOIN t1 on t1.id = t2.id (maybe join +3 tables)

WHERE some conditions on fields;

 

We need the result of that queries taking into account only the last values of each row. The result is inserted/updated in a in-memory K-V database for fast access.

 

Thanks in advance!

 

Best

 

De: Fabian Hueske <[hidden email]>
Fecha: miércoles, 7 de agosto de 2019, 11:08
Para: Maatary Okouya <[hidden email]>
CC: miki haiat <[hidden email]>, user <[hidden email]>
Asunto: [External] Re: From Kafka Stream to Flink

 

This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.


 

Hi,

 

LAST_VAL is not a built-in function, so you'd need to implement it as a user-defined aggregate function (UDAGG) and register it.

 

The problem with joining an append only table with an updating table is the following.

 

Consider two tables: users (uid, name, zip) and orders (oid, uid, product), with user being an updating table and orders being append only.

 

On January 1st, the tables look like this:

 

Users:

uid_1, Fred, 12345

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

 

Joining both tables with the following query SELECT oid, product, name, zip FROM users u, orders o WHERE u.uid = o.uid results in:

 

oid_1, Popcorn, Fred, 12345

oid_2, Carrots, Mary, 67890

 

Whenever, a new order is appended, we look up the corresponding user data, perform the join and emit the results.

Let's say on July 1st we have received 100 orders from our two users all is fine. However, on July 2nd Fred updates his zip code because he moved to another city.

Our data now looks like this:

 

Users:

uid_1, Fred, 24680

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

....

oid_100, uid_2, Potatoes

 

The result of the same query as before is:

 

oid_1, Popcorn, Fred, 24680

oid_2, Carrots, Mary, 67890

....

oid_100, Potatoes, Mary, 67890

 

Notice how the first row changed?

If we strictly follow SQL semantics (which we do in Flink SQL) the query needs to update the ZIP code of the first result row.

In order to do so, we need access to the original data of the orders table, which is the append only table in our scenario.

Consequently, we need to fully materialize append only tables when they are joined with an updating table without temporal constraints.

 

In many situations, the indented semantics for such a query would be to join the order with the ZIP code of the user *that was valid at the time when the order was placed*.

However, this is *not* semantics of the query of our example. For such a query, we need to model the data differently. The users table needs to store all modifications, i.e., the full history of all updates.

Each update needs a timestamp and each order needs a timestamp as well. With these timestamps, we can write a query that joins an order with the user data that we valid at the time when the order was placed.

This is the temporal constraint that I mentioned before. With this constraint, Flink can use the information about progressing time to reason about how much state it needs to keep because a change of the user table will only affect future orders.

 

Flink makes this a lot easier with the concept of temporal tables [1] and temporal table joins [2].

 

Best,

Fabian

 

 

 

Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <[hidden email]>:

Fabian, 

 

ultimately, i just want to perform a join on the last values for each keys. 

 

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <[hidden email]> wrote:

Fabian, 

 

could you please clarify the following statement: 

 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

 

I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?

 

 

On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:

Thank you for the clarification. Really appreciated. 

 

Is Last_val part of the API ?

 

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:

Hi,

 

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.

A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.

A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

 

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).

However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

 

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...

FROM appendOnlyTable

GROUP BY key

 

Given, this view, you can run all kinds of SQL queries on it.

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.

These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.

You can implement pretty much all logic of KStreams and more in these APIs.

 

Best, Fabian

 

 

Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:

I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:

Can you elaborate more  about your use case .

 

On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:

Hi, 

 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

 

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 

 

 

 



This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com

Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: From Kafka Stream to Flink

Maatary Okouya
Hi all, 

Just wondering what is the status at this point? 

On Thu, Sep 19, 2019 at 4:38 PM Hequn Cheng <[hidden email]> wrote:
Hi,

Fabian is totally right. Big thanks to the detailed answers and nice examples above.

As for the PR, very sorry about the delay. It is mainly because of the merge of blink and my work switching to Flink Python recently. 
However, I think the later version of blink would cover this feature natively with further merges. 

Before that, I think we can use the solution Fabian provided above. 

There are some examples here[1][2] which may be helpful to you @Casado @Maatary.
In [1], the test case quite matches your scenario(perform join after groupby+last_value). It also provides the udaf what you want and shows how to register it.
In [2], the test shows how to use the built-in last_value in SQL. Note that the built-in last_value UDAF is only supported in blink-planner from flink-1.9.0. If you are using the flink-planner(or version before that), you can register the last_value UDAF with the TableEnvironment like it is showed in [1].

Feel free to ask if there are other problems.

Best, Hequn

On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén <[hidden email]> wrote:

Thanks Fabian. [hidden email] Could you share the status? Thanks for your amazing work!

 

De: Fabian Hueske <[hidden email]>
Fecha: viernes, 16 de agosto de 2019, 9:30
Para: "Casado Tejedor, Rubén" <[hidden email]>
CC: Maatary Okouya <[hidden email]>, miki haiat <[hidden email]>, user <[hidden email]>, Hequn Cheng <[hidden email]>
Asunto: Re: [External] Re: From Kafka Stream to Flink

 

Hi Ruben,

 

Work on this feature has already started [1], but stalled a bit (probably due to the effort of merging the new Blink query processor).

Hequn (in CC) is the guy working on upsert table ingestion, maybe he can share what the status of this feature is.

 

Best, Fabian

 

 

Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <[hidden email]>:

Hi

 

Do you have an expected version of Flink to include the capability to ingest an upsert stream as a dynamic table? We have such need in our current project. What we have done is to emulate such behavior working at low level with states (e.g. update existing value if key exists, create a new value if key does not exist). But we cannot use SQL that would help as to do it faster.

 

Our use case is many small flink jobs that have to something like:

 

SELECT some fields

FROM t1 INNER JOIN t1 on t1.id = t2.id (maybe join +3 tables)

WHERE some conditions on fields;

 

We need the result of that queries taking into account only the last values of each row. The result is inserted/updated in a in-memory K-V database for fast access.

 

Thanks in advance!

 

Best

 

De: Fabian Hueske <[hidden email]>
Fecha: miércoles, 7 de agosto de 2019, 11:08
Para: Maatary Okouya <[hidden email]>
CC: miki haiat <[hidden email]>, user <[hidden email]>
Asunto: [External] Re: From Kafka Stream to Flink

 

This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.


 

Hi,

 

LAST_VAL is not a built-in function, so you'd need to implement it as a user-defined aggregate function (UDAGG) and register it.

 

The problem with joining an append only table with an updating table is the following.

 

Consider two tables: users (uid, name, zip) and orders (oid, uid, product), with user being an updating table and orders being append only.

 

On January 1st, the tables look like this:

 

Users:

uid_1, Fred, 12345

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

 

Joining both tables with the following query SELECT oid, product, name, zip FROM users u, orders o WHERE u.uid = o.uid results in:

 

oid_1, Popcorn, Fred, 12345

oid_2, Carrots, Mary, 67890

 

Whenever, a new order is appended, we look up the corresponding user data, perform the join and emit the results.

Let's say on July 1st we have received 100 orders from our two users all is fine. However, on July 2nd Fred updates his zip code because he moved to another city.

Our data now looks like this:

 

Users:

uid_1, Fred, 24680

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

....

oid_100, uid_2, Potatoes

 

The result of the same query as before is:

 

oid_1, Popcorn, Fred, 24680

oid_2, Carrots, Mary, 67890

....

oid_100, Potatoes, Mary, 67890

 

Notice how the first row changed?

If we strictly follow SQL semantics (which we do in Flink SQL) the query needs to update the ZIP code of the first result row.

In order to do so, we need access to the original data of the orders table, which is the append only table in our scenario.

Consequently, we need to fully materialize append only tables when they are joined with an updating table without temporal constraints.

 

In many situations, the indented semantics for such a query would be to join the order with the ZIP code of the user *that was valid at the time when the order was placed*.

However, this is *not* semantics of the query of our example. For such a query, we need to model the data differently. The users table needs to store all modifications, i.e., the full history of all updates.

Each update needs a timestamp and each order needs a timestamp as well. With these timestamps, we can write a query that joins an order with the user data that we valid at the time when the order was placed.

This is the temporal constraint that I mentioned before. With this constraint, Flink can use the information about progressing time to reason about how much state it needs to keep because a change of the user table will only affect future orders.

 

Flink makes this a lot easier with the concept of temporal tables [1] and temporal table joins [2].

 

Best,

Fabian

 

 

 

Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <[hidden email]>:

Fabian, 

 

ultimately, i just want to perform a join on the last values for each keys. 

 

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <[hidden email]> wrote:

Fabian, 

 

could you please clarify the following statement: 

 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

 

I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?

 

 

On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:

Thank you for the clarification. Really appreciated. 

 

Is Last_val part of the API ?

 

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:

Hi,

 

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.

A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.

A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

 

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).

However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

 

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...

FROM appendOnlyTable

GROUP BY key

 

Given, this view, you can run all kinds of SQL queries on it.

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.

These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.

You can implement pretty much all logic of KStreams and more in these APIs.

 

Best, Fabian

 

 

Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:

I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:

Can you elaborate more  about your use case .

 

On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:

Hi, 

 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

 

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 

 

 

 



This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com

Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: From Kafka Stream to Flink

Kurt Young
I think this requirement can be satisfied by temporal table function [1], am I missing anything?


Best,
Kurt


On Sat, Mar 28, 2020 at 2:47 PM Maatary Okouya <[hidden email]> wrote:
Hi all, 

Just wondering what is the status at this point? 

On Thu, Sep 19, 2019 at 4:38 PM Hequn Cheng <[hidden email]> wrote:
Hi,

Fabian is totally right. Big thanks to the detailed answers and nice examples above.

As for the PR, very sorry about the delay. It is mainly because of the merge of blink and my work switching to Flink Python recently. 
However, I think the later version of blink would cover this feature natively with further merges. 

Before that, I think we can use the solution Fabian provided above. 

There are some examples here[1][2] which may be helpful to you @Casado @Maatary.
In [1], the test case quite matches your scenario(perform join after groupby+last_value). It also provides the udaf what you want and shows how to register it.
In [2], the test shows how to use the built-in last_value in SQL. Note that the built-in last_value UDAF is only supported in blink-planner from flink-1.9.0. If you are using the flink-planner(or version before that), you can register the last_value UDAF with the TableEnvironment like it is showed in [1].

Feel free to ask if there are other problems.

Best, Hequn

On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén <[hidden email]> wrote:

Thanks Fabian. [hidden email] Could you share the status? Thanks for your amazing work!

 

De: Fabian Hueske <[hidden email]>
Fecha: viernes, 16 de agosto de 2019, 9:30
Para: "Casado Tejedor, Rubén" <[hidden email]>
CC: Maatary Okouya <[hidden email]>, miki haiat <[hidden email]>, user <[hidden email]>, Hequn Cheng <[hidden email]>
Asunto: Re: [External] Re: From Kafka Stream to Flink

 

Hi Ruben,

 

Work on this feature has already started [1], but stalled a bit (probably due to the effort of merging the new Blink query processor).

Hequn (in CC) is the guy working on upsert table ingestion, maybe he can share what the status of this feature is.

 

Best, Fabian

 

 

Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <[hidden email]>:

Hi

 

Do you have an expected version of Flink to include the capability to ingest an upsert stream as a dynamic table? We have such need in our current project. What we have done is to emulate such behavior working at low level with states (e.g. update existing value if key exists, create a new value if key does not exist). But we cannot use SQL that would help as to do it faster.

 

Our use case is many small flink jobs that have to something like:

 

SELECT some fields

FROM t1 INNER JOIN t1 on t1.id = t2.id (maybe join +3 tables)

WHERE some conditions on fields;

 

We need the result of that queries taking into account only the last values of each row. The result is inserted/updated in a in-memory K-V database for fast access.

 

Thanks in advance!

 

Best

 

De: Fabian Hueske <[hidden email]>
Fecha: miércoles, 7 de agosto de 2019, 11:08
Para: Maatary Okouya <[hidden email]>
CC: miki haiat <[hidden email]>, user <[hidden email]>
Asunto: [External] Re: From Kafka Stream to Flink

 

This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.


 

Hi,

 

LAST_VAL is not a built-in function, so you'd need to implement it as a user-defined aggregate function (UDAGG) and register it.

 

The problem with joining an append only table with an updating table is the following.

 

Consider two tables: users (uid, name, zip) and orders (oid, uid, product), with user being an updating table and orders being append only.

 

On January 1st, the tables look like this:

 

Users:

uid_1, Fred, 12345

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

 

Joining both tables with the following query SELECT oid, product, name, zip FROM users u, orders o WHERE u.uid = o.uid results in:

 

oid_1, Popcorn, Fred, 12345

oid_2, Carrots, Mary, 67890

 

Whenever, a new order is appended, we look up the corresponding user data, perform the join and emit the results.

Let's say on July 1st we have received 100 orders from our two users all is fine. However, on July 2nd Fred updates his zip code because he moved to another city.

Our data now looks like this:

 

Users:

uid_1, Fred, 24680

uid_2, Mary, 67890

 

Orders

oid_1, uid_1, Popcorn

oid_2, uid_2, Carrots

....

oid_100, uid_2, Potatoes

 

The result of the same query as before is:

 

oid_1, Popcorn, Fred, 24680

oid_2, Carrots, Mary, 67890

....

oid_100, Potatoes, Mary, 67890

 

Notice how the first row changed?

If we strictly follow SQL semantics (which we do in Flink SQL) the query needs to update the ZIP code of the first result row.

In order to do so, we need access to the original data of the orders table, which is the append only table in our scenario.

Consequently, we need to fully materialize append only tables when they are joined with an updating table without temporal constraints.

 

In many situations, the indented semantics for such a query would be to join the order with the ZIP code of the user *that was valid at the time when the order was placed*.

However, this is *not* semantics of the query of our example. For such a query, we need to model the data differently. The users table needs to store all modifications, i.e., the full history of all updates.

Each update needs a timestamp and each order needs a timestamp as well. With these timestamps, we can write a query that joins an order with the user data that we valid at the time when the order was placed.

This is the temporal constraint that I mentioned before. With this constraint, Flink can use the information about progressing time to reason about how much state it needs to keep because a change of the user table will only affect future orders.

 

Flink makes this a lot easier with the concept of temporal tables [1] and temporal table joins [2].

 

Best,

Fabian

 

 

 

Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <[hidden email]>:

Fabian, 

 

ultimately, i just want to perform a join on the last values for each keys. 

 

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <[hidden email]> wrote:

Fabian, 

 

could you please clarify the following statement: 

 

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

 

I am not sure to understand the problem. If i have to append-only table and perform some join on it, what's the issue ?

 

 

On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <[hidden email]> wrote:

Thank you for the clarification. Really appreciated. 

 

Is Last_val part of the API ?

 

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <[hidden email]> wrote:

Hi,

 

Flink does not distinguish between streams and tables. For the Table API / SQL, there are only tables that are changing over time, i.e., dynamic tables.

A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with append-only changes, i.e., records are only inserted and never deleted or modified.

A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has upsert and delete changes, i.e., the table has a unique key and records are inserted, deleted, or updated per key.

 

In the current version, Flink does not have native support to ingest an upsert stream as a dynamic table (right now only append-only tables can be ingested, native support for upsert tables will be added soon.).

However, you can create a view with the following SQL query on an append-only table that creates an upsert table:

 

SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...

FROM appendOnlyTable

GROUP BY key

 

Given, this view, you can run all kinds of SQL queries on it.

However joining an append-only table with this view without adding temporal join condition, means that the stream is fully materialized as state.

This is because previously emitted results must be updated when the view changes.

It really depends on the semantics of the join and query that you need, how much state the query will need to maintain.

 

An alternative to using Table API / SQL and it's dynamic table abstraction is to use Flink's DataStream API and ProcessFunctions.

These APIs are more low level and expose access to state and timers, which are the core ingredients for stream processing.

You can implement pretty much all logic of KStreams and more in these APIs.

 

Best, Fabian

 

 

Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <[hidden email]>:

I would like to have a KTable, or maybe in Flink term a dynamic Table, that only contains the latest value for each keyed record. This would allow me to perform aggregation and join, based on the latest state of every record, as opposed to every record over time, or a period of time. 

 

On Sun, Jul 21, 2019 at 8:21 AM miki haiat <[hidden email]> wrote:

Can you elaborate more  about your use case .

 

On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <[hidden email]> wrote:

Hi, 

 

I am a user of Kafka Stream so far. However, because i have been face with several limitation in particular in performing Join on KTable. 

 

I was wondering what is the appraoch in Flink to achieve  (1) the concept of KTable, i.e. a Table that represent a changeLog, i.e. only the latest version of all keyed records,  and (2) joining those.

 

There are currently a lot of limitation around that on Kafka Stream, and i need that for performing some ETL process, where i need to mirror entire databases in Kafka, and then do some join on the table to emit the logical entity in Kafka Topics. I was hoping that somehow i could acheive that by using FLink as intermediary. 

 

I can see that you support any kind of join, but i just don't see the notion of Ktable. 

 

 

 



This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com