I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks,
Lei |
Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]:
|
Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label.
|
Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink. Best, Jark
|
Hi Jark, I have tried to use CREATE table DDL First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created. But when I query with select * from tableName. There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Perhaps i need some jar to the lib directory. But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly Thanks, Lei
|
Hi Lei, Yes. If you are creating a Kafka table, then the kafka connector jar and some format jars are required. That's weird. If DDL is failed, the yaml way should fail in the same exception, unless some connector properties value is not the same. Could you share the detailed exception stack? Best, Jark
|
In reply to this post by wanglei2@geekplus.com.cn
I am using flink-1.10. But I add flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory. After change to flink-json-1.10.0.jar, flink-sql-connector-kafka_2.12-1.10.0.jar, it works. But I have no idea why the yaml way works when i use flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar in flink-1.10 environment. Thanks, Lei
|
Free forum by Nabble | Edit this page |