Java Code for Kafka Flink SQL

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Java Code for Kafka Flink SQL

Rad Rad
Hi,

Could any one help me by providing some sample java code which Flink
subscribes data data from kafka and then doing SQL queries using SQL APIs.

Also, what are the compatible versions for java/kafka/flink.

Since, I am beginner and there are many exceptions in my code


public class FlinkKafkaSQL {

>         public static void main(String[] args) throws Exception {
>             // Read parameters from command line
>             final ParameterTool params = ParameterTool.fromArgs(args);
>
>             if(params.getNumberOfParameters() < 5) {
>                 System.out.println("\nUsage: FlinkReadKafka " +
>                                    "--read-topic <topic> " +
>                                    "--write-topic <topic> " +
>                                    "--bootstrap.servers <kafka brokers> "
> +
>                                    "zookeeper.connect" +
>                                    "--group.id <groupid>");
>                 return;
>             }
>
>             // setup streaming environment
>             StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
> 10000));
>             env.enableCheckpointing(300000); // 300 seconds
>             env.getConfig().setGlobalJobParameters(params);
>
>             StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>             // specify JSON field names and types
>
>
>             TypeInformation<Row> typeInfo2 = Types.ROW(
>                     new String[] { "iotdevice", "sensorID" },
>                     new TypeInformation<?>[] { Types.STRING()}
>             );
>
>             // create a new tablesource of JSON from kafka
>             KafkaJsonTableSource kafkaTableSource = new
> Kafka09JsonTableSource(
>                     params.getRequired("read-topic"),
>                     params.getProperties(),
>                     typeInfo2);
>
>             // run some SQL to filter results where a key is not null
>             String sql = "SELECT sensorID " +
>                          "FROM iotdevice ";
>             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>             Table result = tableEnv.sql(sql);
>
>             // create a partition for the data going into kafka
>             FlinkFixedPartitioner partition =  new
> FlinkFixedPartitioner();
>
>             // create new tablesink of JSON to kafka
>             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>                     params.getRequired("write-topic"),
>                     params.getProperties(),
>                     partition);
>
>             result.writeToSink(kafkaTableSink);
>
>             env.execute("FlinkReadWriteKafkaJSON");
>         }
> }
>
>
> *This is the dependencies  in pom.xml*
>
>         <dependencies>
>             <dependency>
>                 <groupId>org.apache.flink</groupId>
>                 <artifactId>flink-java</artifactId>
>                 <version>1.3.0</version>
>             </dependency>
>             <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-java_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-clients_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-connector-kafka-0.9</artifactId>
>
> <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-table_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-core</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-
> scala_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>         </dependencies>
>
>
> Regards.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
 


Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Java Code for Kafka Flink SQL

Timo Walther
Hi Rad,

at a first glance your example does not look too bad. Which exceptions
do you get? Did you create your pom.xml with the provided template [1]
and then added flink-table, flink-connector-kafkaXXX, flink-streaming-scala?

Regards,
Timo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/java_api_quickstart.html

Am 02.06.18 um 19:26 schrieb Rad Rad:

> Hi,
>
> Could any one help me by providing some sample java code which Flink
> subscribes data data from kafka and then doing SQL queries using SQL APIs.
>
> Also, what are the compatible versions for java/kafka/flink.
>
> Since, I am beginner and there are many exceptions in my code
>
>
> public class FlinkKafkaSQL {
>>          public static void main(String[] args) throws Exception {
>>              // Read parameters from command line
>>              final ParameterTool params = ParameterTool.fromArgs(args);
>>
>>              if(params.getNumberOfParameters() < 5) {
>>                  System.out.println("\nUsage: FlinkReadKafka " +
>>                                     "--read-topic <topic> " +
>>                                     "--write-topic <topic> " +
>>                                     "--bootstrap.servers <kafka brokers> "
>> +
>>                                     "zookeeper.connect" +
>>                                     "--group.id <groupid>");
>>                  return;
>>              }
>>
>>              // setup streaming environment
>>              StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
>> 10000));
>>              env.enableCheckpointing(300000); // 300 seconds
>>              env.getConfig().setGlobalJobParameters(params);
>>
>>              StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>>
>>              // specify JSON field names and types
>>
>>
>>              TypeInformation<Row> typeInfo2 = Types.ROW(
>>                      new String[] { "iotdevice", "sensorID" },
>>                      new TypeInformation<?>[] { Types.STRING()}
>>              );
>>
>>              // create a new tablesource of JSON from kafka
>>              KafkaJsonTableSource kafkaTableSource = new
>> Kafka09JsonTableSource(
>>                      params.getRequired("read-topic"),
>>                      params.getProperties(),
>>                      typeInfo2);
>>
>>              // run some SQL to filter results where a key is not null
>>              String sql = "SELECT sensorID " +
>>                           "FROM iotdevice ";
>>              tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>>              Table result = tableEnv.sql(sql);
>>
>>              // create a partition for the data going into kafka
>>              FlinkFixedPartitioner partition =  new
>> FlinkFixedPartitioner();
>>
>>              // create new tablesink of JSON to kafka
>>              KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>                      params.getRequired("write-topic"),
>>                      params.getProperties(),
>>                      partition);
>>
>>              result.writeToSink(kafkaTableSink);
>>
>>              env.execute("FlinkReadWriteKafkaJSON");
>>          }
>> }
>>
>>
>> *This is the dependencies  in pom.xml*
>>
>>          <dependencies>
>>              <dependency>
>>                  <groupId>org.apache.flink</groupId>
>>                  <artifactId>flink-java</artifactId>
>>                  <version>1.3.0</version>
>>              </dependency>
>>              <dependency>
>>                          <groupId>org.apache.flink</groupId>
>>                          <artifactId>flink-streaming-java_2.11</artifactId>
>>                          <version>1.3.0</version>
>>                  </dependency>
>>                  <dependency>
>>                          <groupId>org.apache.flink</groupId>
>>                          <artifactId>flink-clients_2.11</artifactId>
>>                          <version>1.3.0</version>
>>                  </dependency>
>>                  <dependency>
>>                          <groupId>org.apache.flink</groupId>
>>                          <artifactId>flink-connector-kafka-0.9</artifactId>
>>
>> <version>1.3.0</version>
>>                  </dependency>
>>                  <dependency>
>>                          <groupId>org.apache.flink</groupId>
>>                          <artifactId>flink-table_2.11</artifactId>
>>                          <version>1.3.0</version>
>>                  </dependency>
>>                  <dependency>
>>                          <groupId>org.apache.flink</groupId>
>>                          <artifactId>flink-core</artifactId>
>>                          <version>1.3.0</version>
>>                  </dependency>
>>                  <dependency>
>>                          <groupId>org.apache.flink</groupId>
>>                          <artifactId>flink-streaming-
>> scala_2.11</artifactId>
>>                          <version>1.3.0</version>
>>                  </dependency>
>>          </dependencies>
>>
>>
>> Regards.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>
>  
>
>
> Thank you.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/