Hello guys I have registered some table environments and now I'm trying to perform a query on these using LEFT JOIN like the example below:
Table fullenrichment = tenv.sqlQuery( "SELECT pp.a,pp.b,pp.c,pp.d,pp.a " +" FROM t1 pp LEFT JOIN t2 ent" + " ON pp.b = ent.b" + " LEFT JOIN t3 act " + " ON pp.a = act.a " ); Once the query is complete I need to read this into a Row DS DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class); I'm getting the following error, however, if I execute the same code but instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the code works , why this behavior? 1930 [main] INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Flink Kinesis Consumer is going to read the following streams: tr-stream-ingestion, 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. Exception in thread "main" org.apache.flink.table.api.TableException: Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages. at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262) at consumer.trconsumer.main(trconsumer.java:180) |
Hi, it's because the Outer Joins will generate retractions, consider the behavior of Left Outer Join 1. left record arrives, no matched right record, so +(left, null) will be generated. 2 right record arrives, the previous result should be retracted, so -(left, null) and +(left, right) will be generated Andres Angel <[hidden email]> 于2019年7月25日周四 上午8:15写道:
|
Thanks so much for your answer , but then how should I perform such as comparison ? Which options do we have ? Thanks Le mer. 24 juill. 2019 10:01 p.m., Ruidong Li <[hidden email]> a écrit :
|
If you need an outer join, the only solution is to convert the table into a retraction stream and correctly handle the retraction messages. Btw. even then this might not perform as you would like it to be. The query will store all input tables completely in state. So you might run out of space sooner or later if just one of the tables is an append only stream. You need to add temporal constraints join constraints to ensure that the query can release state that will never be joined with any future rows. I'd recommend to have a look at this page [1]. Best, Fabian Am Do., 25. Juli 2019 um 13:54 Uhr schrieb Andres Angel <[hidden email]>:
|
Free forum by Nabble | Edit this page |