[Flink SQL] Insert query fails for partitioned table

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[Flink SQL] Insert query fails for partitioned table

cristi.cioriia

Hey guys,

 

I’m trying to create a Kafka backed partitioned table [1] and insert some data into it [2] using the sql-client, but I get the error [3] when doing it. Can you guys help with this? Also, I wanted to add the partition to the table as in [4] as per the documentation, but then the creation of the table failed with the error [5]. Can you let me know why it doesn’t work?

 

The versions used for our Flink pods deployed in Kubernetes are flink:1.11.1-scala_2.11, as in the Flink sql-training.

 

Looking forward for your input.

 

Thanks,

Cristi

 

[1] ` CREATE TABLE KafkaTable (

  cntStart TIMESTAMP(3),

  cntEnd TIMESTAMP(3),

  cnt BIGINT,

  partitionByField STRING,

/*other fields*/

PARTITIONED BY (partitionByField)

WITH (

  'connector' = 'kafka',

  'topic' =’topic-name’,

  'properties.bootstrap.servers' = kafka-server:port’,

  'properties.group.id' = 'GroupId',

  'scan.startup.mode' = 'earliest-offset',

  'format' = 'json'

);

 

[2] INSERT INTO KafkaTable SELECT HOP_START(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntStart, HOP_END(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS cntEnd, COUNT(DISTINCT idField) AS cnt, /*other_fields*/ FROM source_table GROUP BY HOP(someTs, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE),  /*other_fields*/

 

[3] [ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.TableException: 'default_catalog.default_database. KafkaTable s' is a partitioned table, but the underlying [Kafka universal table sink] DynamicTableSink doesn't implement SupportsPartitioning interface.

 

[4]  `CREATE TABLE KafkaTable (

  cntStart TIMESTAMP(3),

  cntEnd TIMESTAMP(3),

  cnt BIGINT,

  partitionByField STRING,

/*other fields*/,

  `partition` BIGINT METADATA VIRTUAL

PARTITIONED BY (partitionByField)

WITH (

  'connector' = 'kafka',

  'topic' =’topic-name’,

  'properties.bootstrap.servers' = kafka-server:port’,

  'properties.group.id' = 'GroupId',

  'scan.startup.mode' = 'earliest-offset',

  'format' = 'json'

);

 

[5] [ERROR] Could not execute SQL statement. Reason:

org.apache.flink.sql.parser.impl.ParseException: Encountered "METADATA" at line 12, column 22.

 

 

 

Cristian Cioriia

+40751812984(tel)

Office address

Software Development Engineer

+40751812984 (cell)

Anchor Plaza, Bulevardul Timișoara 26Z, București 061331

Adobe

[hidden email]

www.adobe.com