Hey guys, I’m trying to create a Kafka backed partitioned table [1] and insert some data into it [2] using the sql-client, but I get the error [3] when doing it. Can you guys help
with this? Also, I wanted to add the partition to the table as in [4] as per the documentation, but then the creation of the table failed with the error [5]. Can you let me know why it doesn’t work? The versions used for our Flink pods deployed in Kubernetes are flink:1.11.1-scala_2.11, as in the Flink sql-training. Looking forward for your input. Thanks, Cristi [1] ` CREATE TABLE KafkaTable ( cntStart TIMESTAMP(3), cntEnd TIMESTAMP(3), cnt BIGINT, partitionByField STRING, /*other fields*/ ) PARTITIONED BY (partitionByField) WITH ( 'connector' = 'kafka', 'topic' =’topic-name’, 'properties.bootstrap.servers' = kafka-server:port’, 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); [2] INSERT INTO KafkaTable SELECT HOP_START(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntStart, HOP_END(someTs, INTERVAL '1' MINUTE,
INTERVAL '5' MINUTE) AS cntEnd, COUNT(DISTINCT idField) AS cnt, /*other_fields*/ FROM source_table GROUP BY HOP(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE), /*other_fields*/ [3] [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: 'default_catalog.default_database. KafkaTable s' is a partitioned table, but the underlying [Kafka universal
table sink] DynamicTableSink doesn't implement SupportsPartitioning interface. [4] `CREATE TABLE KafkaTable ( cntStart TIMESTAMP(3), cntEnd TIMESTAMP(3), cnt BIGINT, partitionByField STRING, /*other fields*/, `partition` BIGINT METADATA VIRTUAL ) PARTITIONED BY (partitionByField) WITH ( 'connector' = 'kafka', 'topic' =’topic-name’, 'properties.bootstrap.servers' = kafka-server:port’, 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); [5] [ERROR] Could not execute SQL statement. Reason: org.apache.flink.sql.parser.impl.ParseException: Encountered "METADATA" at line 12, column 22.
|
Free forum by Nabble | Edit this page |