LEFT JOIN issue SQL API

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

LEFT JOIN issue SQL API

Andres Angel
Hello guys I have registered some table environments and now I'm trying to perform a query on these using LEFT JOIN like the example below:

 Table fullenrichment = tenv.sqlQuery(
                "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
                        " FROM t1 pp LEFT JOIN t2 ent" +
                        " ON pp.b = ent.b" +
                        " LEFT JOIN t3 act " +
                        " ON pp.a = act.a "
        );

Once the query is complete I need to read this into a Row DS

DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);

I'm getting the following error, however, if I execute the same code but instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the code works , why this behavior?

1930 [main] INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Flink Kinesis Consumer is going to read the following streams: tr-stream-ingestion,
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages.
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
at consumer.trconsumer.main(trconsumer.java:180)
Reply | Threaded
Open this post in threaded view
|

Re: LEFT JOIN issue SQL API

Ruidong Li
Hi, it's because the Outer Joins will generate retractions, consider the behavior of Left Outer Join

1.  left record arrives, no matched right record, so  +(left, null) will be generated.
2  right record arrives, the previous result should be retracted, so -(left, null) and +(left, right) will be generated

Andres Angel <[hidden email]> 于2019年7月25日周四 上午8:15写道:
Hello guys I have registered some table environments and now I'm trying to perform a query on these using LEFT JOIN like the example below:

 Table fullenrichment = tenv.sqlQuery(
                "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
                        " FROM t1 pp LEFT JOIN t2 ent" +
                        " ON pp.b = ent.b" +
                        " LEFT JOIN t3 act " +
                        " ON pp.a = act.a "
        );

Once the query is complete I need to read this into a Row DS

DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);

I'm getting the following error, however, if I execute the same code but instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the code works , why this behavior?

1930 [main] INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Flink Kinesis Consumer is going to read the following streams: tr-stream-ingestion,
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages.
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
at consumer.trconsumer.main(trconsumer.java:180)
Reply | Threaded
Open this post in threaded view
|

Re: LEFT JOIN issue SQL API

Andres Angel
Thanks so much for your answer , but then how should I perform such as comparison ? Which options do we have ?
Thanks 

Le mer. 24 juill. 2019 10:01 p.m., Ruidong Li <[hidden email]> a écrit :
Hi, it's because the Outer Joins will generate retractions, consider the behavior of Left Outer Join

1.  left record arrives, no matched right record, so  +(left, null) will be generated.
2  right record arrives, the previous result should be retracted, so -(left, null) and +(left, right) will be generated

Andres Angel <[hidden email]> 于2019年7月25日周四 上午8:15写道:
Hello guys I have registered some table environments and now I'm trying to perform a query on these using LEFT JOIN like the example below:

 Table fullenrichment = tenv.sqlQuery(
                "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
                        " FROM t1 pp LEFT JOIN t2 ent" +
                        " ON pp.b = ent.b" +
                        " LEFT JOIN t3 act " +
                        " ON pp.a = act.a "
        );

Once the query is complete I need to read this into a Row DS

DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);

I'm getting the following error, however, if I execute the same code but instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the code works , why this behavior?

1930 [main] INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Flink Kinesis Consumer is going to read the following streams: tr-stream-ingestion,
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages.
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
at consumer.trconsumer.main(trconsumer.java:180)
Reply | Threaded
Open this post in threaded view
|

Re: LEFT JOIN issue SQL API

Fabian Hueske-2
If you need an outer join, the only solution is to convert the table into a retraction stream and correctly handle the retraction messages.
Btw. even then this might not perform as you would like it to be.
The query will store all input tables completely in state. So you might run out of space sooner or later if just one of the tables is an append only stream.

You need to add temporal constraints join constraints to ensure that the query can release state that will never be joined with any future rows.
I'd recommend to have a look at this page [1].

Best, Fabian


Am Do., 25. Juli 2019 um 13:54 Uhr schrieb Andres Angel <[hidden email]>:
Thanks so much for your answer , but then how should I perform such as comparison ? Which options do we have ?
Thanks 

Le mer. 24 juill. 2019 10:01 p.m., Ruidong Li <[hidden email]> a écrit :
Hi, it's because the Outer Joins will generate retractions, consider the behavior of Left Outer Join

1.  left record arrives, no matched right record, so  +(left, null) will be generated.
2  right record arrives, the previous result should be retracted, so -(left, null) and +(left, right) will be generated

Andres Angel <[hidden email]> 于2019年7月25日周四 上午8:15写道:
Hello guys I have registered some table environments and now I'm trying to perform a query on these using LEFT JOIN like the example below:

 Table fullenrichment = tenv.sqlQuery(
                "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
                        " FROM t1 pp LEFT JOIN t2 ent" +
                        " ON pp.b = ent.b" +
                        " LEFT JOIN t3 act " +
                        " ON pp.a = act.a "
        );

Once the query is complete I need to read this into a Row DS

DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);

I'm getting the following error, however, if I execute the same code but instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the code works , why this behavior?

1930 [main] INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Flink Kinesis Consumer is going to read the following streams: tr-stream-ingestion,
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages.
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
at consumer.trconsumer.main(trconsumer.java:180)