Ask for SQL using kafka in Flink

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Ask for SQL using kafka in Flink

Rad Rad
Hi,

Could anyone help me to solve this problem


/Exception in thread "main" java.lang.Error: Unresolved compilation problem:
        The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation<Row>) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
        public static void main(String[] args) throws Exception {
            // Read parameters from command line
            final ParameterTool params = ParameterTool.fromArgs(args);

            if(params.getNumberOfParameters() < 5) {
                System.out.println("\nUsage: FlinkReadKafka " +
                                   "--read-topic <topic> " +
                                   "--write-topic <topic> " +
                                   "--bootstrap.servers <kafka brokers> " +
                                   "zookeeper.connect" +
                                   "--group.id <groupid>");
                return;
            }

            // setup streaming environment
            StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
           
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
            env.enableCheckpointing(300000); // 300 seconds
            env.getConfig().setGlobalJobParameters(params);

            StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

            // specify JSON field names and types
         
           
            TypeInformation<Row> typeInfo2 = Types.ROW(
                    new String[] { "iotdevice", "sensorID" },
                    new TypeInformation<?>[] { Types.STRING()}
            );

            // create a new tablesource of JSON from kafka
            KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                    params.getRequired("read-topic"),
                    params.getProperties(),
                    typeInfo2);

            // run some SQL to filter results where a key is not null
            String sql = "SELECT sensorID " +
                         "FROM iotdevice ";
            tableEnv.registerTableSource("iotdevice", kafkaTableSource);
            Table result = tableEnv.sql(sql);

            // create a partition for the data going into kafka
            FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

            // create new tablesink of JSON to kafka
            KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                    params.getRequired("write-topic"),
                    params.getProperties(),
                    partition);

            result.writeToSink(kafkaTableSink);

            env.execute("FlinkReadWriteKafkaJSON");
        }
}


*This is the dependencies  in pom.xml*

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-0.9</artifactId>
                       
<version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-scala_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
        </dependencies>


Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Rong Rong
Hi Radhya,

Can you provide which Flink version you are using? Based on the latest FLINK 1.5 release, Kafka09JsonTableSource takes:
/**
* Creates a Kafka 0.9 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
*/
Also, your type definition: TypeInformation<Row> typeInfo2 = Types.ROW(... arguments seem to have different length for schema names and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal <[hidden email]> wrote:
Hi,

Could anyone help me to solve this problem


/Exception in thread "main" java.lang.Error: Unresolved compilation problem:
        The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation<Row>) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
        public static void main(String[] args) throws Exception {
            // Read parameters from command line
            final ParameterTool params = ParameterTool.fromArgs(args);

            if(params.getNumberOfParameters() < 5) {
                System.out.println("\nUsage: FlinkReadKafka " +
                                   "--read-topic <topic> " +
                                   "--write-topic <topic> " +
                                   "--bootstrap.servers <kafka brokers> " +
                                   "zookeeper.connect" +
                                   "--group.id <groupid>");
                return;
            }

            // setup streaming environment
            StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
            env.enableCheckpointing(300000); // 300 seconds
            env.getConfig().setGlobalJobParameters(params);

            StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

            // specify JSON field names and types


            TypeInformation<Row> typeInfo2 = Types.ROW(
                    new String[] { "iotdevice", "sensorID" },
                    new TypeInformation<?>[] { Types.STRING()}
            );

            // create a new tablesource of JSON from kafka
            KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                    params.getRequired("read-topic"),
                    params.getProperties(),
                    typeInfo2);

            // run some SQL to filter results where a key is not null
            String sql = "SELECT sensorID " +
                         "FROM iotdevice ";
            tableEnv.registerTableSource("iotdevice", kafkaTableSource);
            Table result = tableEnv.sql(sql);

            // create a partition for the data going into kafka
            FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

            // create new tablesink of JSON to kafka
            KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                    params.getRequired("write-topic"),
                    params.getProperties(),
                    partition);

            result.writeToSink(kafkaTableSink);

            env.execute("FlinkReadWriteKafkaJSON");
        }
}


*This is the dependencies  in pom.xml*

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-0.9</artifactId>

<version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-scala_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
        </dependencies>


Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Rad Rad
Thanks Rong,

I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?

Yes, there are two names but now I put one name only and I want to define
jsonschema.

Rong Rong wrote

> Hi Radhya,
>
> Can you provide which Flink version you are using? Based on the latest
> FLINK 1.5 release, Kafka09JsonTableSource takes:
>
> /**
>  * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
>  *
>  * @param topic       Kafka topic to consume.
>  * @param properties  Properties for the Kafka consumer.
>  * @param tableSchema The schema of the table.
>  * @param jsonSchema  The schema of the JSON messages to decode from
> Kafka.
>  */
>
> Also, your type definition: TypeInformation
> <Row>
>  typeInfo2 = Types.ROW(...
> arguments seem to have different length for schema names and types.
>
> Thanks,
> Rong
>
> On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;

> radhya.sahal@

> &gt; wrote:
>
>> Hi,
>>
>> Could anyone help me to solve this problem
>>
>>
>> /Exception in thread "main" java.lang.Error: Unresolved compilation
>> problem:
>>         The constructor Kafka09JsonTableSource(String, Properties,
>> TypeInformation
> <Row>
> ) is undefined
>> /
>> *--This is the code *
>> public class FlinkKafkaSQL {
>>         public static void main(String[] args) throws Exception {
>>             // Read parameters from command line
>>             final ParameterTool params = ParameterTool.fromArgs(args);
>>
>>             if(params.getNumberOfParameters() < 5) {
>>                 System.out.println("\nUsage: FlinkReadKafka " +
>>                                    "--read-topic
> <topic>
>  " +
>>                                    "--write-topic
> <topic>
>  " +
>>                                    "--bootstrap.servers
> <kafka brokers>
>  " +
>>                                    "zookeeper.connect" +
>>                                    "--group.id
> <groupid>
> ");
>>                 return;
>>             }
>>
>>             // setup streaming environment
>>             StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
>> 10000));
>>             env.enableCheckpointing(300000); // 300 seconds
>>             env.getConfig().setGlobalJobParameters(params);
>>
>>             StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>>
>>             // specify JSON field names and types
>>
>>
>>             TypeInformation
> <Row>
>  typeInfo2 = Types.ROW(
>>                     new String[] { "iotdevice", "sensorID" },
>>                     new TypeInformation<?>[] { Types.STRING()}
>>             );
>>
>>             // create a new tablesource of JSON from kafka
>>             KafkaJsonTableSource kafkaTableSource = new
>> Kafka09JsonTableSource(
>>                     params.getRequired("read-topic"),
>>                     params.getProperties(),
>>                     typeInfo2);
>>
>>             // run some SQL to filter results where a key is not null
>>             String sql = "SELECT sensorID " +
>>                          "FROM iotdevice ";
>>             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>>             Table result = tableEnv.sql(sql);
>>
>>             // create a partition for the data going into kafka
>>             FlinkFixedPartitioner partition =  new
>> FlinkFixedPartitioner();
>>
>>             // create new tablesink of JSON to kafka
>>             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>                     params.getRequired("write-topic"),
>>                     params.getProperties(),
>>                     partition);
>>
>>             result.writeToSink(kafkaTableSink);
>>
>>             env.execute("FlinkReadWriteKafkaJSON");
>>         }
>> }
>>
>>
>> *This is the dependencies  in pom.xml*
>>
>>        
> <dependencies>
>>            
> <dependency>
>>                
> <groupId>
> org.apache.flink
> </groupId>
>>                
> <artifactId>
> flink-java
> </artifactId>
>>                
> <version>
> 1.3.0
> </version>
>>            
> </dependency>
>>            
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-streaming-java_2.11
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-clients_2.11
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-connector-kafka-0.9
> </artifactId>
>>
>>
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-table_2.11
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-core
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-streaming-
>> scala_2.11
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>        
> </dependencies>
>>
>>
>> Regards.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Rad Rad
In reply to this post by Rong Rong
Thanks Rong, I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema? Yes, there are two names but now I put one name only and I want to define jsonschema.
Rong Rong wrote
Hi Radhya, Can you provide which Flink version you are using? Based on the latest FLINK 1.5 release, Kafka09JsonTableSource takes: /** * Creates a Kafka 0.9 JSON {@link StreamTableSource}. * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param tableSchema The schema of the table. * @param jsonSchema The schema of the JSON messages to decode from Kafka. */ Also, your type definition: TypeInformation typeInfo2 = Types.ROW(... arguments seem to have different length for schema names and types. Thanks, Rong On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal <[hidden email]> wrote: > Hi, > > Could anyone help me to solve this problem > > > /Exception in thread "main" java.lang.Error: Unresolved compilation > problem: > The constructor Kafka09JsonTableSource(String, Properties, > TypeInformation) is undefined > / > *--This is the code * > public class FlinkKafkaSQL { > public static void main(String[] args) throws Exception { > // Read parameters from command line > final ParameterTool params = ParameterTool.fromArgs(args); > > if(params.getNumberOfParameters() < 5) { > System.out.println("\nUsage: FlinkReadKafka " + > "--read-topic " + > "--write-topic " + > "--bootstrap.servers " + > "zookeeper.connect" + > "--group.id "); > return; > } > > // setup streaming environment > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, > 10000)); > env.enableCheckpointing(300000); // 300 seconds > env.getConfig().setGlobalJobParameters(params); > > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > // specify JSON field names and types > > > TypeInformation typeInfo2 = Types.ROW( > new String[] { "iotdevice", "sensorID" }, > new TypeInformation[] { Types.STRING()} > ); > > // create a new tablesource of JSON from kafka > KafkaJsonTableSource kafkaTableSource = new > Kafka09JsonTableSource( > params.getRequired("read-topic"), > params.getProperties(), > typeInfo2); > > // run some SQL to filter results where a key is not null > String sql = "SELECT sensorID " + > "FROM iotdevice "; > tableEnv.registerTableSource("iotdevice", kafkaTableSource); > Table result = tableEnv.sql(sql); > > // create a partition for the data going into kafka > FlinkFixedPartitioner partition = new FlinkFixedPartitioner(); > > // create new tablesink of JSON to kafka > KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( > params.getRequired("write-topic"), > params.getProperties(), > partition); > > result.writeToSink(kafkaTableSink); > > env.execute("FlinkReadWriteKafkaJSON"); > } > } > > > *This is the dependencies in pom.xml* > > > > org.apache.flink > flink-java > 1.3.0 > > > org.apache.flink > flink-streaming-java_2.11 > 1.3.0 > > > org.apache.flink > flink-clients_2.11 > 1.3.0 > > > org.apache.flink > flink-connector-kafka-0.9 > > 1.3.0 > > > org.apache.flink > flink-table_2.11 > 1.3.0 > > > org.apache.flink > flink-core > 1.3.0 > > > org.apache.flink > flink-streaming- > scala_2.11 > 1.3.0 > > > > > Regards. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >


Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Rad Rad
In reply to this post by Rong Rong
Thanks Rong,

I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?

Yes, there are two names but now I put one name only and I want to define
jsonschema.



Rong Rong wrote

> Hi Radhya,
>
> Can you provide which Flink version you are using? Based on the latest
> FLINK 1.5 release, Kafka09JsonTableSource takes:
>
> /**
>  * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
>  *
>  * @param topic       Kafka topic to consume.
>  * @param properties  Properties for the Kafka consumer.
>  * @param tableSchema The schema of the table.
>  * @param jsonSchema  The schema of the JSON messages to decode from
> Kafka.
>  */
>
> Also, your type definition: TypeInformation
> <Row>
>  typeInfo2 = Types.ROW(...
> arguments seem to have different length for schema names and types.
>
> Thanks,
> Rong
>
> On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;

> radhya.sahal@

> &gt; wrote:
>
>> Hi,
>>
>> Could anyone help me to solve this problem
>>
>>
>> /Exception in thread "main" java.lang.Error: Unresolved compilation
>> problem:
>>         The constructor Kafka09JsonTableSource(String, Properties,
>> TypeInformation
> <Row>
> ) is undefined
>> /
>> *--This is the code *
>> public class FlinkKafkaSQL {
>>         public static void main(String[] args) throws Exception {
>>             // Read parameters from command line
>>             final ParameterTool params = ParameterTool.fromArgs(args);
>>
>>             if(params.getNumberOfParameters() < 5) {
>>                 System.out.println("\nUsage: FlinkReadKafka " +
>>                                    "--read-topic
> <topic>
>  " +
>>                                    "--write-topic
> <topic>
>  " +
>>                                    "--bootstrap.servers
> <kafka brokers>
>  " +
>>                                    "zookeeper.connect" +
>>                                    "--group.id
> <groupid>
> ");
>>                 return;
>>             }
>>
>>             // setup streaming environment
>>             StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
>> 10000));
>>             env.enableCheckpointing(300000); // 300 seconds
>>             env.getConfig().setGlobalJobParameters(params);
>>
>>             StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>>
>>             // specify JSON field names and types
>>
>>
>>             TypeInformation
> <Row>
>  typeInfo2 = Types.ROW(
>>                     new String[] { "iotdevice", "sensorID" },
>>                     new TypeInformation<?>[] { Types.STRING()}
>>             );
>>
>>             // create a new tablesource of JSON from kafka
>>             KafkaJsonTableSource kafkaTableSource = new
>> Kafka09JsonTableSource(
>>                     params.getRequired("read-topic"),
>>                     params.getProperties(),
>>                     typeInfo2);
>>
>>             // run some SQL to filter results where a key is not null
>>             String sql = "SELECT sensorID " +
>>                          "FROM iotdevice ";
>>             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>>             Table result = tableEnv.sql(sql);
>>
>>             // create a partition for the data going into kafka
>>             FlinkFixedPartitioner partition =  new
>> FlinkFixedPartitioner();
>>
>>             // create new tablesink of JSON to kafka
>>             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>                     params.getRequired("write-topic"),
>>                     params.getProperties(),
>>                     partition);
>>
>>             result.writeToSink(kafkaTableSink);
>>
>>             env.execute("FlinkReadWriteKafkaJSON");
>>         }
>> }
>>
>>
>> *This is the dependencies  in pom.xml*
>>
>>        
> <dependencies>
>>            
> <dependency>
>>                
> <groupId>
> org.apache.flink
> </groupId>
>>                
> <artifactId>
> flink-java
> </artifactId>
>>                
> <version>
> 1.3.0
> </version>
>>            
> </dependency>
>>            
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-streaming-java_2.11
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-clients_2.11
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-connector-kafka-0.9
> </artifactId>
>>
>>
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-table_2.11
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-core
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>                
> <dependency>
>>                        
> <groupId>
> org.apache.flink
> </groupId>
>>                        
> <artifactId>
> flink-streaming-
>> scala_2.11
> </artifactId>
>>                        
> <version>
> 1.3.0
> </version>
>>                
> </dependency>
>>        
> </dependencies>
>>
>>
>> Regards.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Timo Walther
Hi,

as you can see in code [1] Kafka09JsonTableSource takes a TableSchema.
You can create table schema from type information see [2].

Regards,
Timo

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
[2]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala

Am 02.06.18 um 18:31 schrieb Radhya Sahal:

> Thanks Rong,
>
> I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?
>
> Yes, there are two names but now I put one name only and I want to define
> jsonschema.
>
>
>
> Rong Rong wrote
>> Hi Radhya,
>>
>> Can you provide which Flink version you are using? Based on the latest
>> FLINK 1.5 release, Kafka09JsonTableSource takes:
>>
>> /**
>>   * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
>>   *
>>   * @param topic       Kafka topic to consume.
>>   * @param properties  Properties for the Kafka consumer.
>>   * @param tableSchema The schema of the table.
>>   * @param jsonSchema  The schema of the JSON messages to decode from
>> Kafka.
>>   */
>>
>> Also, your type definition: TypeInformation
>> <Row>
>>   typeInfo2 = Types.ROW(...
>> arguments seem to have different length for schema names and types.
>>
>> Thanks,
>> Rong
>>
>> On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;
>> radhya.sahal@
>> &gt; wrote:
>>
>>> Hi,
>>>
>>> Could anyone help me to solve this problem
>>>
>>>
>>> /Exception in thread "main" java.lang.Error: Unresolved compilation
>>> problem:
>>>          The constructor Kafka09JsonTableSource(String, Properties,
>>> TypeInformation
>> <Row>
>> ) is undefined
>>> /
>>> *--This is the code *
>>> public class FlinkKafkaSQL {
>>>          public static void main(String[] args) throws Exception {
>>>              // Read parameters from command line
>>>              final ParameterTool params = ParameterTool.fromArgs(args);
>>>
>>>              if(params.getNumberOfParameters() < 5) {
>>>                  System.out.println("\nUsage: FlinkReadKafka " +
>>>                                     "--read-topic
>> <topic>
>>   " +
>>>                                     "--write-topic
>> <topic>
>>   " +
>>>                                     "--bootstrap.servers
>> <kafka brokers>
>>   " +
>>>                                     "zookeeper.connect" +
>>>                                     "--group.id
>> <groupid>
>> ");
>>>                  return;
>>>              }
>>>
>>>              // setup streaming environment
>>>              StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
>>> 10000));
>>>              env.enableCheckpointing(300000); // 300 seconds
>>>              env.getConfig().setGlobalJobParameters(params);
>>>
>>>              StreamTableEnvironment tableEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>>
>>>              // specify JSON field names and types
>>>
>>>
>>>              TypeInformation
>> <Row>
>>   typeInfo2 = Types.ROW(
>>>                      new String[] { "iotdevice", "sensorID" },
>>>                      new TypeInformation<?>[] { Types.STRING()}
>>>              );
>>>
>>>              // create a new tablesource of JSON from kafka
>>>              KafkaJsonTableSource kafkaTableSource = new
>>> Kafka09JsonTableSource(
>>>                      params.getRequired("read-topic"),
>>>                      params.getProperties(),
>>>                      typeInfo2);
>>>
>>>              // run some SQL to filter results where a key is not null
>>>              String sql = "SELECT sensorID " +
>>>                           "FROM iotdevice ";
>>>              tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>>>              Table result = tableEnv.sql(sql);
>>>
>>>              // create a partition for the data going into kafka
>>>              FlinkFixedPartitioner partition =  new
>>> FlinkFixedPartitioner();
>>>
>>>              // create new tablesink of JSON to kafka
>>>              KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>>                      params.getRequired("write-topic"),
>>>                      params.getProperties(),
>>>                      partition);
>>>
>>>              result.writeToSink(kafkaTableSink);
>>>
>>>              env.execute("FlinkReadWriteKafkaJSON");
>>>          }
>>> }
>>>
>>>
>>> *This is the dependencies  in pom.xml*
>>>
>>>          
>> <dependencies>
>>>              
>> <dependency>
>>>                  
>> <groupId>
>> org.apache.flink
>> </groupId>
>>>                  
>> <artifactId>
>> flink-java
>> </artifactId>
>>>                  
>> <version>
>> 1.3.0
>> </version>
>>>              
>> </dependency>
>>>              
>> <dependency>
>>>                          
>> <groupId>
>> org.apache.flink
>> </groupId>
>>>                          
>> <artifactId>
>> flink-streaming-java_2.11
>> </artifactId>
>>>                          
>> <version>
>> 1.3.0
>> </version>
>>>                  
>> </dependency>
>>>                  
>> <dependency>
>>>                          
>> <groupId>
>> org.apache.flink
>> </groupId>
>>>                          
>> <artifactId>
>> flink-clients_2.11
>> </artifactId>
>>>                          
>> <version>
>> 1.3.0
>> </version>
>>>                  
>> </dependency>
>>>                  
>> <dependency>
>>>                          
>> <groupId>
>> org.apache.flink
>> </groupId>
>>>                          
>> <artifactId>
>> flink-connector-kafka-0.9
>> </artifactId>
>>>
>> <version>
>> 1.3.0
>> </version>
>>>                  
>> </dependency>
>>>                  
>> <dependency>
>>>                          
>> <groupId>
>> org.apache.flink
>> </groupId>
>>>                          
>> <artifactId>
>> flink-table_2.11
>> </artifactId>
>>>                          
>> <version>
>> 1.3.0
>> </version>
>>>                  
>> </dependency>
>>>                  
>> <dependency>
>>>                          
>> <groupId>
>> org.apache.flink
>> </groupId>
>>>                          
>> <artifactId>
>> flink-core
>> </artifactId>
>>>                          
>> <version>
>> 1.3.0
>> </version>
>>>                  
>> </dependency>
>>>                  
>> <dependency>
>>>                          
>> <groupId>
>> org.apache.flink
>> </groupId>
>>>                          
>> <artifactId>
>> flink-streaming-
>>> scala_2.11
>> </artifactId>
>>>                          
>> <version>
>> 1.3.0
>> </version>
>>>                  
>> </dependency>
>>>          
>> </dependencies>
>>>
>>> Regards.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>>> n4.nabble.com/
>>>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Shuyi Chen
Given the popularity of Flink SQL and Kafka as streaming source, I think we can add some examples of using Kafka[XXX]TableSource in flink-examples/flink-examples-table module. What do you guys think?

Cheers
Shuyi

On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther <[hidden email]> wrote:
Hi,

as you can see in code [1] Kafka09JsonTableSource takes a TableSchema. You can create table schema from type information see [2].

Regards,
Timo

[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
[2] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala

Am 02.06.18 um 18:31 schrieb Radhya Sahal:

Thanks Rong,

I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?

Yes, there are two names but now I put one name only and I want to define
jsonschema.



Rong Rong wrote
Hi Radhya,

Can you provide which Flink version you are using? Based on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:

/**
  * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
  *
  * @param topic       Kafka topic to consume.
  * @param properties  Properties for the Kafka consumer.
  * @param tableSchema The schema of the table.
  * @param jsonSchema  The schema of the JSON messages to decode from
Kafka.
  */

Also, your type definition: TypeInformation
<Row>
  typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;
radhya.sahal@
&gt; wrote:

Hi,

Could anyone help me to solve this problem


/Exception in thread "main" java.lang.Error: Unresolved compilation
problem:
         The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation
<Row>
) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
         public static void main(String[] args) throws Exception {
             // Read parameters from command line
             final ParameterTool params = ParameterTool.fromArgs(args);

             if(params.getNumberOfParameters() < 5) {
                 System.out.println("\nUsage: FlinkReadKafka " +
                                    "--read-topic
<topic>
  " +
                                    "--write-topic
<topic>
  " +
                                    "--bootstrap.servers
<kafka brokers>
  " +
                                    "zookeeper.connect" +
                                    "--group.id
<groupid>
");
                 return;
             }

             // setup streaming environment
             StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
             env.enableCheckpointing(300000); // 300 seconds
             env.getConfig().setGlobalJobParameters(params);

             StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

             // specify JSON field names and types


             TypeInformation
<Row>
  typeInfo2 = Types.ROW(
                     new String[] { "iotdevice", "sensorID" },
                     new TypeInformation<?>[] { Types.STRING()}
             );

             // create a new tablesource of JSON from kafka
             KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                     params.getRequired("read-topic"),
                     params.getProperties(),
                     typeInfo2);

             // run some SQL to filter results where a key is not null
             String sql = "SELECT sensorID " +
                          "FROM iotdevice ";
             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
             Table result = tableEnv.sql(sql);

             // create a partition for the data going into kafka
             FlinkFixedPartitioner partition =  new
FlinkFixedPartitioner();

             // create new tablesink of JSON to kafka
             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                     params.getRequired("write-topic"),
                     params.getProperties(),
                     partition);

             result.writeToSink(kafkaTableSink);

             env.execute("FlinkReadWriteKafkaJSON");
         }
}


*This is the dependencies  in pom.xml*

         
<dependencies>
             
<dependency>
                 
<groupId>
org.apache.flink
</groupId>
                 
<artifactId>
flink-java
</artifactId>
                 
<version>
1.3.0
</version>
             
</dependency>
             
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-streaming-java_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-clients_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-connector-kafka-0.9
</artifactId>

<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-table_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-core
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-streaming-
scala_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
         
</dependencies>

Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
"So you have to trust that the dots will somehow connect in your future."
Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Rad Rad
In reply to this post by Timo Walther
Thanks Timo. I will go over there.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Rad Rad
In reply to this post by Shuyi Chen

Yes I totally agree Shuyi Chen, if anyone has an example which does the
pipeline streaming processing (Kafka-Flink-SQL), it will be useful for all
of us.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Will Du
In reply to this post by Shuyi Chen
Yes, I am also looking for examples for Kafka avro table examples in java and command line. Also, Kafka avro table sink is still missing. In addition, once we have Kafka topic, the API should read the schema directly from schema file or schema registry. The way of current API supporting lacks of flexibility, just my own opinion.

Sent from my iPhone

On Jun 4, 2018, at 14:29, Shuyi Chen <[hidden email]> wrote:

Given the popularity of Flink SQL and Kafka as streaming source, I think we can add some examples of using Kafka[XXX]TableSource in flink-examples/flink-examples-table module. What do you guys think?

Cheers
Shuyi

On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther <[hidden email]> wrote:
Hi,

as you can see in code [1] Kafka09JsonTableSource takes a TableSchema. You can create table schema from type information see [2].

Regards,
Timo

[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
[2] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala

Am 02.06.18 um 18:31 schrieb Radhya Sahal:

Thanks Rong,

I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?

Yes, there are two names but now I put one name only and I want to define
jsonschema.



Rong Rong wrote
Hi Radhya,

Can you provide which Flink version you are using? Based on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:

/**
  * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
  *
  * @param topic       Kafka topic to consume.
  * @param properties  Properties for the Kafka consumer.
  * @param tableSchema The schema of the table.
  * @param jsonSchema  The schema of the JSON messages to decode from
Kafka.
  */

Also, your type definition: TypeInformation
<Row>
  typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;
radhya.sahal@
&gt; wrote:

Hi,

Could anyone help me to solve this problem


/Exception in thread "main" java.lang.Error: Unresolved compilation
problem:
         The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation
<Row>
) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
         public static void main(String[] args) throws Exception {
             // Read parameters from command line
             final ParameterTool params = ParameterTool.fromArgs(args);

             if(params.getNumberOfParameters() < 5) {
                 System.out.println("\nUsage: FlinkReadKafka " +
                                    "--read-topic
<topic>
  " +
                                    "--write-topic
<topic>
  " +
                                    "--bootstrap.servers
<kafka brokers>
  " +
                                    "zookeeper.connect" +
                                    "--group.id
<groupid>
");
                 return;
             }

             // setup streaming environment
             StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
             env.enableCheckpointing(300000); // 300 seconds
             env.getConfig().setGlobalJobParameters(params);

             StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

             // specify JSON field names and types


             TypeInformation
<Row>
  typeInfo2 = Types.ROW(
                     new String[] { "iotdevice", "sensorID" },
                     new TypeInformation<?>[] { Types.STRING()}
             );

             // create a new tablesource of JSON from kafka
             KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                     params.getRequired("read-topic"),
                     params.getProperties(),
                     typeInfo2);

             // run some SQL to filter results where a key is not null
             String sql = "SELECT sensorID " +
                          "FROM iotdevice ";
             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
             Table result = tableEnv.sql(sql);

             // create a partition for the data going into kafka
             FlinkFixedPartitioner partition =  new
FlinkFixedPartitioner();

             // create new tablesink of JSON to kafka
             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                     params.getRequired("write-topic"),
                     params.getProperties(),
                     partition);

             result.writeToSink(kafkaTableSink);

             env.execute("FlinkReadWriteKafkaJSON");
         }
}


*This is the dependencies  in pom.xml*

         
<dependencies>
             
<dependency>
                 
<groupId>
org.apache.flink
</groupId>
                 
<artifactId>
flink-java
</artifactId>
                 
<version>
1.3.0
</version>
             
</dependency>
             
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-streaming-java_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-clients_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-connector-kafka-0.9
</artifactId>

<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-table_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-core
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-streaming-
scala_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
         
</dependencies>

Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
"So you have to trust that the dots will somehow connect in your future."
Reply | Threaded
Open this post in threaded view
|

Re: Ask for SQL using kafka in Flink

Timo Walther
@Shuyi: Yes, a more advanced table example would be helpful anyway and combining it with Kafka/Avro end-to-end would be even better.

@Will: I totally agree that the current connector ecosystem could be improved. This is also on mid-term roadmap. Contributors that could help here are very welcome. We also did a step towards improving the situation by [1][2] etc.

[1] https://issues.apache.org/jira/browse/FLINK-8240
[2] https://issues.apache.org/jira/browse/FLINK-8630

Regards,
Timo


Am 04.06.18 um 23:06 schrieb Will Du:
Yes, I am also looking for examples for Kafka avro table examples in java and command line. Also, Kafka avro table sink is still missing. In addition, once we have Kafka topic, the API should read the schema directly from schema file or schema registry. The way of current API supporting lacks of flexibility, just my own opinion.

Sent from my iPhone

On Jun 4, 2018, at 14:29, Shuyi Chen <[hidden email]> wrote:

Given the popularity of Flink SQL and Kafka as streaming source, I think we can add some examples of using Kafka[XXX]TableSource in flink-examples/flink-examples-table module. What do you guys think?

Cheers
Shuyi

On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther <[hidden email]> wrote:
Hi,

as you can see in code [1] Kafka09JsonTableSource takes a TableSchema. You can create table schema from type information see [2].

Regards,
Timo

[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
[2] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala

Am 02.06.18 um 18:31 schrieb Radhya Sahal:

Thanks Rong,

I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?

Yes, there are two names but now I put one name only and I want to define
jsonschema.



Rong Rong wrote
Hi Radhya,

Can you provide which Flink version you are using? Based on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:

/**
  * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
  *
  * @param topic       Kafka topic to consume.
  * @param properties  Properties for the Kafka consumer.
  * @param tableSchema The schema of the table.
  * @param jsonSchema  The schema of the JSON messages to decode from
Kafka.
  */

Also, your type definition: TypeInformation
<Row>
  typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;
radhya.sahal@
&gt; wrote:

Hi,

Could anyone help me to solve this problem


/Exception in thread "main" java.lang.Error: Unresolved compilation
problem:
         The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation
<Row>
) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
         public static void main(String[] args) throws Exception {
             // Read parameters from command line
             final ParameterTool params = ParameterTool.fromArgs(args);

             if(params.getNumberOfParameters() < 5) {
                 System.out.println("\nUsage: FlinkReadKafka " +
                                    "--read-topic
<topic>
  " +
                                    "--write-topic
<topic>
  " +
                                    "--bootstrap.servers
<kafka brokers>
  " +
                                    "zookeeper.connect" +
                                    "--group.id
<groupid>
");
                 return;
             }

             // setup streaming environment
             StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
             env.enableCheckpointing(300000); // 300 seconds
             env.getConfig().setGlobalJobParameters(params);

             StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

             // specify JSON field names and types


             TypeInformation
<Row>
  typeInfo2 = Types.ROW(
                     new String[] { "iotdevice", "sensorID" },
                     new TypeInformation<?>[] { Types.STRING()}
             );

             // create a new tablesource of JSON from kafka
             KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                     params.getRequired("read-topic"),
                     params.getProperties(),
                     typeInfo2);

             // run some SQL to filter results where a key is not null
             String sql = "SELECT sensorID " +
                          "FROM iotdevice ";
             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
             Table result = tableEnv.sql(sql);

             // create a partition for the data going into kafka
             FlinkFixedPartitioner partition =  new
FlinkFixedPartitioner();

             // create new tablesink of JSON to kafka
             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                     params.getRequired("write-topic"),
                     params.getProperties(),
                     partition);

             result.writeToSink(kafkaTableSink);

             env.execute("FlinkReadWriteKafkaJSON");
         }
}


*This is the dependencies  in pom.xml*

         
<dependencies>
             
<dependency>
                 
<groupId>
org.apache.flink
</groupId>
                 
<artifactId>
flink-java
</artifactId>
                 
<version>
1.3.0
</version>
             
</dependency>
             
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-streaming-java_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-clients_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-connector-kafka-0.9
</artifactId>

<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-table_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-core
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-streaming-
scala_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
         
</dependencies>

Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
"So you have to trust that the dots will somehow connect in your future."