Flink SQL - Join Lookup Table

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL - Join Lookup Table

Kelly Smith

Hi folks,

 

I have a question Flink SQL. What I want to do is this:

 

  • Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table.

 

 

For example, a simple lookup table:

 

CREATE TABLE LookupTable (
   
`computeClass`  STRING,
   
`multiplier`    FLOAT
) WITH (
   
'connector' = 'filesystem',
   
'path' = 'fpu-multipliers.csv',
   
'format' = 'csv'
)

 

 

And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above.

 

SELECT
   
`timestamp`,
    // ...
    ks.computeClass,
    lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
 

Doing a simple join like that gives me this error:

 

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.”

 

Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory).

 

Any ideas on how to accomplish what I’m trying to do?

 

Thanks!

Kelly

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - Join Lookup Table

godfrey he
hi  Kelly,
As the exception message mentioned: currently, we must cast the time attribute to regular TIMESTAMP type,  
then we can do regular join. Because time attribute will be out-of-order after regular join, 
and then we can't do window aggregate based on the time attribute. 

We can improve it that the planner implicitly casts the time attribute to regular TIMESTAMP type, 
and throws exception there is an operator (after join) depended on time attribute, like window aggregate.

I will create a JIRA to trace this.

Best,
Godfrey

Kelly Smith <[hidden email]> 于2020年7月21日周二 上午6:38写道:

Hi folks,

 

I have a question Flink SQL. What I want to do is this:

 

  • Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table.

 

 

For example, a simple lookup table:

 

CREATE TABLE LookupTable (
   
`computeClass`  STRING,
   
`multiplier`    FLOAT
) WITH (
   
'connector' = 'filesystem',
   
'path' = 'fpu-multipliers.csv',
   
'format' = 'csv'
)

 

 

And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above.

 

SELECT
   
`timestamp`,
    // ...
    ks.computeClass,
    lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
 

Doing a simple join like that gives me this error:

 

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.”

 

Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory).

 

Any ideas on how to accomplish what I’m trying to do?

 

Thanks!

Kelly

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - Join Lookup Table

godfrey he

godfrey he <[hidden email]> 于2020年7月21日周二 上午9:46写道:
hi  Kelly,
As the exception message mentioned: currently, we must cast the time attribute to regular TIMESTAMP type,  
then we can do regular join. Because time attribute will be out-of-order after regular join, 
and then we can't do window aggregate based on the time attribute. 

We can improve it that the planner implicitly casts the time attribute to regular TIMESTAMP type, 
and throws exception there is an operator (after join) depended on time attribute, like window aggregate.

I will create a JIRA to trace this.

Best,
Godfrey

Kelly Smith <[hidden email]> 于2020年7月21日周二 上午6:38写道:

Hi folks,

 

I have a question Flink SQL. What I want to do is this:

 

  • Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table.

 

 

For example, a simple lookup table:

 

CREATE TABLE LookupTable (
   
`computeClass`  STRING,
   
`multiplier`    FLOAT
) WITH (
   
'connector' = 'filesystem',
   
'path' = 'fpu-multipliers.csv',
   
'format' = 'csv'
)

 

 

And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above.

 

SELECT
   
`timestamp`,
    // ...
    ks.computeClass,
    lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
 

Doing a simple join like that gives me this error:

 

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.”

 

Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory).

 

Any ideas on how to accomplish what I’m trying to do?

 

Thanks!

Kelly

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - Join Lookup Table

Danny Chan
In reply to this post by Kelly Smith
Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.


Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith <[hidden email]>,写道:

Hi folks,

 

I have a question Flink SQL. What I want to do is this:

 

  • Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table.

 

 

For example, a simple lookup table:

 

CREATE TABLE LookupTable (
   
`computeClass`  STRING,
   
`multiplier`    FLOAT
) WITH (
   
'connector' = 'filesystem',
   
'path' = 'fpu-multipliers.csv',
   
'format' = 'csv'
)

 

 

And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above.

 

SELECT
`timestamp`,
    // ...
ks.computeClass,
lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
 

Doing a simple join like that gives me this error:

 

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.”

 

Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory).

 

Any ideas on how to accomplish what I’m trying to do?

 

Thanks!

Kelly

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - Join Lookup Table

Leonard Xu
Hi, kelly

Looks like you want to use fact table(from Kafka) to join a dimension table(From filesystem),  dimension table is one kind of Temporal Table, temporal table join syntax you could refer Danny's post[1].

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet which means you can not use it as a dimension table, the connector that supported `LookupTableSource` includes JDBC、HBase、Hive,
you can created an issue to support `lookupTableSource` for filesystem connector.

Another approach is using Temporal Table Function[1] which can define a Temporal table from a dataStream, you can convert your Table(filesystem table) to stream and then create a temporal table and then join the temporal table.


Best
Leonard Xu
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function


在 2020年7月21日,10:07,Danny Chan <[hidden email]> 写道:

Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.


Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith <[hidden email]>,写道:

Hi folks,

 

I have a question Flink SQL. What I want to do is this:

 
  • Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table.
 
 

For example, a simple lookup table:

 

CREATE TABLE LookupTable (
   
`computeClass`  STRING,
   
`multiplier`    FLOAT
) WITH (
   
'connector' = 'filesystem',
   
'path' = 'fpu-multipliers.csv',
   
'format' = 'csv'
)

 
 

And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above.

 
SELECT
`timestamp`,
    // ...
ks.computeClass,
lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
 

Doing a simple join like that gives me this error:

 

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.”

 

Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory).

 

Any ideas on how to accomplish what I’m trying to do?

 

Thanks!

Kelly


Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - Join Lookup Table

Kelly Smith

Thanks Leonard and Danny,

 

This makes a lot of sense. My hope here is to only use SQL without any specialized Java/Scala code, so it seems it may not be possible to use either of these methods yet.

 

I’ll open an issue for the LookupTableSource implementation, and look into the workaround you suggested in the short term.

 

Thanks!

Kelly

 

From: Leonard Xu <[hidden email]>
Date: Monday, July 20, 2020 at 7:49 PM
To: Danny Chan <[hidden email]>
Cc: Kelly Smith <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Flink SQL - Join Lookup Table

 

Hi, kelly

 

Looks like you want to use fact table(from Kafka) to join a dimension table(From filesystem),  dimension table is one kind of Temporal Table, temporal table join syntax you could refer Danny's post[1].

 

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet which means you can not use it as a dimension table, the connector that supported `LookupTableSource` includes JDBCHBaseHive

you can created an issue to support `lookupTableSource` for filesystem connector.

 

Another approach is using Temporal Table Function[1] which can define a Temporal table from a dataStream, you can convert your Table(filesystem table) to stream and then create a temporal table and then join the temporal table.

 

 

Best

Leonard Xu

[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function

 



2020721日,10:07Danny Chan <[hidden email]> 写道:

 

Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax

 

Join LookupTable FOR SYSTEM_TIME AS OF …

 

See [1] for details.

 

 

Best,

Danny Chan

2020721 +0800 AM6:32Kelly Smith <[hidden email]>,写道:

Hi folks,

 

I have a question Flink SQL. What I want to do is this:

 

  • Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table.

 

 

For example, a simple lookup table:

 

CREATE TABLE LookupTable (
   
`computeClass`  STRING,
   
`multiplier`    FLOAT
) WITH (
   
'connector' = 'filesystem',
   
'path' = 'fpu-multipliers.csv',
   
'format' = 'csv'
)

 

 

And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above.

 

SELECT
   
`timestamp`,
    // ...
    ks.computeClass,
    lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
 

Doing a simple join like that gives me this error:

 

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.”

 

Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory).

 

Any ideas on how to accomplish what I’m trying to do?

 

Thanks!

Kelly

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - Join Lookup Table

Jingsong Li
Hi Kelly,

There are issues for tracking:
- Filesystem support single file reading: https://issues.apache.org/jira/browse/FLINK-17398
- Filesystem support LookupJoin: https://issues.apache.org/jira/browse/FLINK-17397

Best,
Jingsong

On Wed, Jul 22, 2020 at 3:13 AM Kelly Smith <[hidden email]> wrote:

Thanks Leonard and Danny,

 

This makes a lot of sense. My hope here is to only use SQL without any specialized Java/Scala code, so it seems it may not be possible to use either of these methods yet.

 

I’ll open an issue for the LookupTableSource implementation, and look into the workaround you suggested in the short term.

 

Thanks!

Kelly

 

From: Leonard Xu <[hidden email]>
Date: Monday, July 20, 2020 at 7:49 PM
To: Danny Chan <[hidden email]>
Cc: Kelly Smith <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Flink SQL - Join Lookup Table

 

Hi, kelly

 

Looks like you want to use fact table(from Kafka) to join a dimension table(From filesystem),  dimension table is one kind of Temporal Table, temporal table join syntax you could refer Danny's post[1].

 

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet which means you can not use it as a dimension table, the connector that supported `LookupTableSource` includes JDBCHBaseHive

you can created an issue to support `lookupTableSource` for filesystem connector.

 

Another approach is using Temporal Table Function[1] which can define a Temporal table from a dataStream, you can convert your Table(filesystem table) to stream and then create a temporal table and then join the temporal table.

 

 

Best

Leonard Xu

[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function

 



2020721日,10:07Danny Chan <[hidden email]> 写道:

 

Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax

 

Join LookupTable FOR SYSTEM_TIME AS OF …

 

See [1] for details.

 

 

Best,

Danny Chan

2020721 +0800 AM6:32Kelly Smith <[hidden email]>,写道:

Hi folks,

 

I have a question Flink SQL. What I want to do is this:

 

  • Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table.

 

 

For example, a simple lookup table:

 

CREATE TABLE LookupTable (
   
`computeClass`  STRING,
   
`multiplier`    FLOAT
) WITH (
   
'connector' = 'filesystem',
   
'path' = 'fpu-multipliers.csv',
   
'format' = 'csv'
)

 

 

And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above.

 

SELECT
   
`timestamp`,
    // ...
    ks.computeClass,
    lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
 

Doing a simple join like that gives me this error:

 

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.”

 

Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory).

 

Any ideas on how to accomplish what I’m trying to do?

 

Thanks!

Kelly

 



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - Join Lookup Table

Jark Wu-3
In reply to this post by Kelly Smith
Hi Kelly,

As a simple workaround, You can remove the watermark definition in `KafkaStream`, in this way, the stream-stream join will not complain "Rowtime attributes" exception.

Best,
Jark

On Wed, 22 Jul 2020 at 03:13, Kelly Smith <[hidden email]> wrote:

Thanks Leonard and Danny,

 

This makes a lot of sense. My hope here is to only use SQL without any specialized Java/Scala code, so it seems it may not be possible to use either of these methods yet.

 

I’ll open an issue for the LookupTableSource implementation, and look into the workaround you suggested in the short term.

 

Thanks!

Kelly

 

From: Leonard Xu <[hidden email]>
Date: Monday, July 20, 2020 at 7:49 PM
To: Danny Chan <[hidden email]>
Cc: Kelly Smith <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Flink SQL - Join Lookup Table

 

Hi, kelly

 

Looks like you want to use fact table(from Kafka) to join a dimension table(From filesystem),  dimension table is one kind of Temporal Table, temporal table join syntax you could refer Danny's post[1].

 

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet which means you can not use it as a dimension table, the connector that supported `LookupTableSource` includes JDBCHBaseHive

you can created an issue to support `lookupTableSource` for filesystem connector.

 

Another approach is using Temporal Table Function[1] which can define a Temporal table from a dataStream, you can convert your Table(filesystem table) to stream and then create a temporal table and then join the temporal table.

 

 

Best

Leonard Xu

[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function

 



2020721日,10:07Danny Chan <[hidden email]> 写道:

 

Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax

 

Join LookupTable FOR SYSTEM_TIME AS OF …

 

See [1] for details.

 

 

Best,

Danny Chan

2020721 +0800 AM6:32Kelly Smith <[hidden email]>,写道:

Hi folks,

 

I have a question Flink SQL. What I want to do is this:

 

  • Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table.

 

 

For example, a simple lookup table:

 

CREATE TABLE LookupTable (
   
`computeClass`  STRING,
   
`multiplier`    FLOAT
) WITH (
   
'connector' = 'filesystem',
   
'path' = 'fpu-multipliers.csv',
   
'format' = 'csv'
)

 

 

And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above.

 

SELECT
   
`timestamp`,
    // ...
    ks.computeClass,
    lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
 

Doing a simple join like that gives me this error:

 

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.”

 

Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory).

 

Any ideas on how to accomplish what I’m trying to do?

 

Thanks!

Kelly