Dose flink-1.10 sql-client support kafka sink?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Dose flink-1.10 sql-client support kafka sink?

wanglei2@geekplus.com.cn

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei
Reply | Threaded
Open this post in threaded view
|

Re: Dose flink-1.10 sql-client support kafka sink?

Arvid Heise-3
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

On Tue, Mar 10, 2020 at 10:51 AM [hidden email] <[hidden email]> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei
Reply | Threaded
Open this post in threaded view
|

Re: Re: Dose flink-1.10 sql-client support kafka sink?

wanglei2@geekplus.com.cn
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"

under format label.

Date: 2020-03-10 20:51
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

On Tue, Mar 10, 2020 at 10:51 AM [hidden email] <[hidden email]> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei
Reply | Threaded
Open this post in threaded view
|

Re: Re: Dose flink-1.10 sql-client support kafka sink?

Jark Wu-3
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark


On Tue, 10 Mar 2020 at 21:52, [hidden email] <[hidden email]> wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"

under format label.

Date: 2020-03-10 20:51
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

On Tue, Mar 10, 2020 at 10:51 AM [hidden email] <[hidden email]> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei
Reply | Threaded
Open this post in threaded view
|

Re: Re: Dose flink-1.10 sql-client support kafka sink?

wanglei2@geekplus.com.cn
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly

Thanks,
Lei
 


 
Date: 2020-03-11 11:13
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark


On Tue, 10 Mar 2020 at 21:52, [hidden email] <[hidden email]> wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"

under format label.

Date: 2020-03-10 20:51
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

On Tue, Mar 10, 2020 at 10:51 AM [hidden email] <[hidden email]> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei
Reply | Threaded
Open this post in threaded view
|

Re: Re: Dose flink-1.10 sql-client support kafka sink?

Jark Wu-3
Hi Lei,

Yes. If you are creating a Kafka table, then the kafka connector jar and some format jars are required.

That's weird. If DDL is failed, the yaml way should fail in the same exception, unless some connector properties value is not the same. 

Could you share the detailed exception stack? 

Best,
Jark

On Wed, 11 Mar 2020 at 14:51, [hidden email] <[hidden email]> wrote:
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly

Thanks,
Lei
 


 
Date: 2020-03-11 11:13
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark


On Tue, 10 Mar 2020 at 21:52, [hidden email] <[hidden email]> wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"

under format label.

Date: 2020-03-10 20:51
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

On Tue, Mar 10, 2020 at 10:51 AM [hidden email] <[hidden email]> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei
Reply | Threaded
Open this post in threaded view
|

Re: Re: Dose flink-1.10 sql-client support kafka sink?

wanglei2@geekplus.com.cn
In reply to this post by wanglei2@geekplus.com.cn

I am using flink-1.10.  But I add flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory.
After change to flink-json-1.10.0.jar, flink-sql-connector-kafka_2.12-1.10.0.jar, it works.

But I have no idea why the yaml way works when i use  flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar in  flink-1.10 environment.

Thanks,
Lei


 
Date: 2020-03-11 14:51
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly

Thanks,
Lei
 


 
Date: 2020-03-11 11:13
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark


On Tue, 10 Mar 2020 at 21:52, [hidden email] <[hidden email]> wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"

under format label.

Date: 2020-03-10 20:51
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

On Tue, Mar 10, 2020 at 10:51 AM [hidden email] <[hidden email]> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei