Flink 1.4 SQL API Streaming TableException

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.4 SQL API Streaming TableException

Pavel Ciorba
Hi everyone!

I decided to try the Time-windowed join functionality of Flink 1.4+.

My SQL query is an exact copy of the example in the documentation, and the program reads and writes from Kafka.

I used the example from here:

Code:

Dependencies:
compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.0'
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.4.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.11_2.11', version: '1.4.0'

Error:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalJoin(condition=[AND(=($0, $5), >=($3, -($8, 14400000)), <=($3, $8))], joinType=[inner])
  FlinkLogicalTableSourceScan(table=[[TABLE1]], fields=[id, value1, timestamp], source=[KafkaJSONTableSource])
  FlinkLogicalTableSourceScan(table=[[TABLE2]], fields=[id, value2, timestamp], source=[KafkaJSONTableSource])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:251)
at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
at com.sheffield.healthmonitoring.join.conditionalhr.JoinSQL.main(JoinSQL.java:72)


I get the error in 1.4.0, 1.4.1 and 1.4.2, but 1.5-SNAPSHOT works. 

From what I can see the feature should work in 1.4. 

What might be the issue?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.4 SQL API Streaming TableException

杨力
To use a field in a table as timestamp, it must be declared as a rowtime attribute for the table.

1) Call env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime).
2) Call withRowtimeAttribute on KafkaJsonTableSourceBuilder.

Reference:
1. https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes
2. https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#configuring-a-processing-time-attribute

On Sat, Mar 10, 2018 at 4:49 AM Pavel Ciorba <[hidden email]> wrote:
Hi everyone!

I decided to try the Time-windowed join functionality of Flink 1.4+.

My SQL query is an exact copy of the example in the documentation, and the program reads and writes from Kafka.

I used the example from here:

Code:

Dependencies:
compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.0'
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.4.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.11_2.11', version: '1.4.0'

Error:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalJoin(condition=[AND(=($0, $5), >=($3, -($8, 14400000)), <=($3, $8))], joinType=[inner])
  FlinkLogicalTableSourceScan(table=[[TABLE1]], fields=[id, value1, timestamp], source=[KafkaJSONTableSource])
  FlinkLogicalTableSourceScan(table=[[TABLE2]], fields=[id, value2, timestamp], source=[KafkaJSONTableSource])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:251)
at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
at com.sheffield.healthmonitoring.join.conditionalhr.JoinSQL.main(JoinSQL.java:72)


I get the error in 1.4.0, 1.4.1 and 1.4.2, but 1.5-SNAPSHOT works. 

From what I can see the feature should work in 1.4. 

What might be the issue?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.4 SQL API Streaming TableException

Pavel Ciorba
Bill Lee,

Man, you saved me from headbanging :) Thank you!

2018-03-10 0:25 GMT+02:00 杨力 <[hidden email]>:
To use a field in a table as timestamp, it must be declared as a rowtime attribute for the table.

1) Call env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime).
2) Call withRowtimeAttribute on KafkaJsonTableSourceBuilder.

Reference:
1. https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes
2. https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#configuring-a-processing-time-attribute

On Sat, Mar 10, 2018 at 4:49 AM Pavel Ciorba <[hidden email]> wrote:
Hi everyone!

I decided to try the Time-windowed join functionality of Flink 1.4+.

My SQL query is an exact copy of the example in the documentation, and the program reads and writes from Kafka.

I used the example from here:

Code:

Dependencies:
compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.0'
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.4.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.11_2.11', version: '1.4.0'

Error:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalJoin(condition=[AND(=($0, $5), >=($3, -($8, 14400000)), <=($3, $8))], joinType=[inner])
  FlinkLogicalTableSourceScan(table=[[TABLE1]], fields=[id, value1, timestamp], source=[KafkaJSONTableSource])
  FlinkLogicalTableSourceScan(table=[[TABLE2]], fields=[id, value2, timestamp], source=[KafkaJSONTableSource])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:251)
at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
at com.sheffield.healthmonitoring.join.conditionalhr.JoinSQL.main(JoinSQL.java:72)


I get the error in 1.4.0, 1.4.1 and 1.4.2, but 1.5-SNAPSHOT works. 

From what I can see the feature should work in 1.4. 

What might be the issue?

Thank you!