Hello Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a: Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) my query: my s3 table: CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ('connector'='filesystem','path'='s3a://fs/','format'='json') my kafka table: CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME()) WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092','properties.group.id'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false') |
Hi Eric, it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case. Best, Matthias On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <[hidden email]> wrote:
|
Hi, Eric In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment. Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature. I think we can implement LookupTableSource for FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache. Could you help create an JIRA ticket for this? Best, Leonard
|
Hi Leonard, Thx for your reply, Not problem to help on the JIRA topic, In my situation, in a full sql env, what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id? i know how to do t in stream. eric Le sam. 27 févr. 2021 à 05:15, Leonard Xu <[hidden email]> a écrit :
|
Hi, Eric
Currently you can put your statical data in Hive/JDBC/HBase which supports lookup the data in full table env as a workaround,. You can also write a UDF which caches the s3 files that can be used to enrich your stream data. Best, Leonard
|
Thx Leonard, by UDF you mean a custom table source on s3? Le jeu. 4 mars 2021 à 05:31, Leonard Xu <[hidden email]> a écrit :
|
Sorry, I mean you can create a UDTF where you can cache data from your files and then enrich your stream with LATERAL TABLE grammar,
BTW, you can reference FileSystemLookupFunction.java[1]. If we plan to support Lookup for filesystem, we should use this function too. Best, Leonard
|
Free forum by Nabble | Edit this page |