Processing-time temporal join is not supported yet.

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Processing-time temporal join is not supported yet.

eric hoffmann
Hello
Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:

Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)

my query:

SELECT e.id, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.idr.id

my s3 table:

CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
      WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')

my kafka table:

CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
      WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092','properties.group.id'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')

Reply | Threaded
Open this post in threaded view
|

Re: Processing-time temporal join is not supported yet.

Matthias
Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.

Best,
Matthias


On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <[hidden email]> wrote:
Hello
Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:

Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)

my query:

SELECT e.id, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.idr.id

my s3 table:

CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
      WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')

my kafka table:

CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
      WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092','properties.group.id'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')


Reply | Threaded
Open this post in threaded view
|

Re: Processing-time temporal join is not supported yet.

Leonard Xu
Hi, Eric

Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table.

In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment.  
Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature.

I think we can  implement LookupTableSource for  FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache.  Could you help create an JIRA ticket for this?


Best,
Leonard 


在 2021年2月26日,23:41,Matthias Pohl <[hidden email]> 写道:

Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.

Best,
Matthias


On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <[hidden email]> wrote:
Hello
Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:

Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)

my query:

SELECT e.id, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.idr.id

my s3 table:

CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
      WITH ('connector'='filesystem','path'='<a href="s3a://fs/','format'='json" class="">s3a://fs/','format'='json')

my kafka table:

CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
      WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092','properties.group.id'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')



Reply | Threaded
Open this post in threaded view
|

Re: Processing-time temporal join is not supported yet.

eric hoffmann
Hi Leonard,
Thx for your reply,
Not problem to help on the JIRA topic,
In my situation, in a full sql env, what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id?
i know how to do t in stream.
eric

Le sam. 27 févr. 2021 à 05:15, Leonard Xu <[hidden email]> a écrit :
Hi, Eric

Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table.

In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment.  
Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature.

I think we can  implement LookupTableSource for  FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache.  Could you help create an JIRA ticket for this?


Best,
Leonard 


在 2021年2月26日,23:41,Matthias Pohl <[hidden email]> 写道:

Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.

Best,
Matthias


On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <[hidden email]> wrote:
Hello
Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:

Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)

my query:

SELECT e.id, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.idr.id

my s3 table:

CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
      WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')

my kafka table:

CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
      WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092','properties.group.id'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')



Reply | Threaded
Open this post in threaded view
|

Re: Processing-time temporal join is not supported yet.

Leonard Xu
Hi, Eric

what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id?
Currently you can put your statical data in Hive/JDBC/HBase which supports lookup the data in full table env as a workaround,.
You can also write a UDF which caches the s3 files that can be used to enrich your stream data.

Best,
Leonard



Le sam. 27 févr. 2021 à 05:15, Leonard Xu <[hidden email]> a écrit :
Hi, Eric

Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table.

In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment.  
Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature.

I think we can  implement LookupTableSource for  FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache.  Could you help create an JIRA ticket for this?


Best,
Leonard 


在 2021年2月26日,23:41,Matthias Pohl <[hidden email]> 写道:

Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.

Best,
Matthias


On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <[hidden email]> wrote:
Hello
Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:

Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)

my query:

SELECT e.id, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.idr.id

my s3 table:

CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
      WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')

my kafka table:

CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
      WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092','properties.group.id'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')




Reply | Threaded
Open this post in threaded view
|

Re: Processing-time temporal join is not supported yet.

eric hoffmann
Thx Leonard,
by UDF you mean a custom table source on s3?

Le jeu. 4 mars 2021 à 05:31, Leonard Xu <[hidden email]> a écrit :
Hi, Eric

what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id?
Currently you can put your statical data in Hive/JDBC/HBase which supports lookup the data in full table env as a workaround,.
You can also write a UDF which caches the s3 files that can be used to enrich your stream data.

Best,
Leonard



Le sam. 27 févr. 2021 à 05:15, Leonard Xu <[hidden email]> a écrit :
Hi, Eric

Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table.

In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment.  
Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature.

I think we can  implement LookupTableSource for  FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache.  Could you help create an JIRA ticket for this?


Best,
Leonard 


在 2021年2月26日,23:41,Matthias Pohl <[hidden email]> 写道:

Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.

Best,
Matthias


On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <[hidden email]> wrote:
Hello
Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:

Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)

my query:

SELECT e.id, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.idr.id

my s3 table:

CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
      WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')

my kafka table:

CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
      WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092','properties.group.id'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')




Reply | Threaded
Open this post in threaded view
|

Re: Processing-time temporal join is not supported yet.

Leonard Xu
Sorry, I mean you can create a UDTF where you can cache data from your files and then enrich your stream with LATERAL TABLE grammar,

BTW, you can reference FileSystemLookupFunction.java[1]. If we plan to support Lookup for filesystem, we should use this function too.

Best,
Leonard


 

在 2021年3月4日,19:26,eric hoffmann <[hidden email]> 写道:

Thx Leonard,
by UDF you mean a custom table source on s3?

Le jeu. 4 mars 2021 à 05:31, Leonard Xu <[hidden email]> a écrit :
Hi, Eric

what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id?
Currently you can put your statical data in Hive/JDBC/HBase which supports lookup the data in full table env as a workaround,.
You can also write a UDF which caches the s3 files that can be used to enrich your stream data.

Best,
Leonard



Le sam. 27 févr. 2021 à 05:15, Leonard Xu <[hidden email]> a écrit :
Hi, Eric

Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table.

In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment.  
Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature.

I think we can  implement LookupTableSource for  FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache.  Could you help create an JIRA ticket for this?


Best,
Leonard 


在 2021年2月26日,23:41,Matthias Pohl <[hidden email]> 写道:

Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.

Best,
Matthias


On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <[hidden email]> wrote:
Hello
Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:

Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)

my query:

SELECT e.id, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.idr.id

my s3 table:

CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
      WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')

my kafka table:

CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
      WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092','properties.group.id'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')