Hi,
Could any one help me by providing some sample java code which Flink subscribes data data from kafka and then doing SQL queries using SQL APIs. Also, what are the compatible versions for java/kafka/flink. Since, I am beginner and there are many exceptions in my code public class FlinkKafkaSQL { > public static void main(String[] args) throws Exception { > // Read parameters from command line > final ParameterTool params = ParameterTool.fromArgs(args); > > if(params.getNumberOfParameters() < 5) { > System.out.println("\nUsage: FlinkReadKafka " + > "--read-topic <topic> " + > "--write-topic <topic> " + > "--bootstrap.servers <kafka brokers> " > + > "zookeeper.connect" + > "--group.id <groupid>"); > return; > } > > // setup streaming environment > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, > 10000)); > env.enableCheckpointing(300000); // 300 seconds > env.getConfig().setGlobalJobParameters(params); > > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > // specify JSON field names and types > > > TypeInformation<Row> typeInfo2 = Types.ROW( > new String[] { "iotdevice", "sensorID" }, > new TypeInformation<?>[] { Types.STRING()} > ); > > // create a new tablesource of JSON from kafka > KafkaJsonTableSource kafkaTableSource = new > Kafka09JsonTableSource( > params.getRequired("read-topic"), > params.getProperties(), > typeInfo2); > > // run some SQL to filter results where a key is not null > String sql = "SELECT sensorID " + > "FROM iotdevice "; > tableEnv.registerTableSource("iotdevice", kafkaTableSource); > Table result = tableEnv.sql(sql); > > // create a partition for the data going into kafka > FlinkFixedPartitioner partition = new > FlinkFixedPartitioner(); > > // create new tablesink of JSON to kafka > KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( > params.getRequired("write-topic"), > params.getProperties(), > partition); > > result.writeToSink(kafkaTableSink); > > env.execute("FlinkReadWriteKafkaJSON"); > } > } > > > *This is the dependencies in pom.xml* > > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.11</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.11</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.9</artifactId> > > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table_2.11</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming- > scala_2.11</artifactId> > <version>1.3.0</version> > </dependency> > </dependencies> > > > Regards. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ > Thank you. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Rad,
at a first glance your example does not look too bad. Which exceptions do you get? Did you create your pom.xml with the provided template [1] and then added flink-table, flink-connector-kafkaXXX, flink-streaming-scala? Regards, Timo [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/java_api_quickstart.html Am 02.06.18 um 19:26 schrieb Rad Rad: > Hi, > > Could any one help me by providing some sample java code which Flink > subscribes data data from kafka and then doing SQL queries using SQL APIs. > > Also, what are the compatible versions for java/kafka/flink. > > Since, I am beginner and there are many exceptions in my code > > > public class FlinkKafkaSQL { >> public static void main(String[] args) throws Exception { >> // Read parameters from command line >> final ParameterTool params = ParameterTool.fromArgs(args); >> >> if(params.getNumberOfParameters() < 5) { >> System.out.println("\nUsage: FlinkReadKafka " + >> "--read-topic <topic> " + >> "--write-topic <topic> " + >> "--bootstrap.servers <kafka brokers> " >> + >> "zookeeper.connect" + >> "--group.id <groupid>"); >> return; >> } >> >> // setup streaming environment >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, >> 10000)); >> env.enableCheckpointing(300000); // 300 seconds >> env.getConfig().setGlobalJobParameters(params); >> >> StreamTableEnvironment tableEnv = >> TableEnvironment.getTableEnvironment(env); >> >> // specify JSON field names and types >> >> >> TypeInformation<Row> typeInfo2 = Types.ROW( >> new String[] { "iotdevice", "sensorID" }, >> new TypeInformation<?>[] { Types.STRING()} >> ); >> >> // create a new tablesource of JSON from kafka >> KafkaJsonTableSource kafkaTableSource = new >> Kafka09JsonTableSource( >> params.getRequired("read-topic"), >> params.getProperties(), >> typeInfo2); >> >> // run some SQL to filter results where a key is not null >> String sql = "SELECT sensorID " + >> "FROM iotdevice "; >> tableEnv.registerTableSource("iotdevice", kafkaTableSource); >> Table result = tableEnv.sql(sql); >> >> // create a partition for the data going into kafka >> FlinkFixedPartitioner partition = new >> FlinkFixedPartitioner(); >> >> // create new tablesink of JSON to kafka >> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( >> params.getRequired("write-topic"), >> params.getProperties(), >> partition); >> >> result.writeToSink(kafkaTableSink); >> >> env.execute("FlinkReadWriteKafkaJSON"); >> } >> } >> >> >> *This is the dependencies in pom.xml* >> >> <dependencies> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>1.3.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-java_2.11</artifactId> >> <version>1.3.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_2.11</artifactId> >> <version>1.3.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka-0.9</artifactId> >> >> <version>1.3.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table_2.11</artifactId> >> <version>1.3.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-core</artifactId> >> <version>1.3.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming- >> scala_2.11</artifactId> >> <version>1.3.0</version> >> </dependency> >> </dependencies> >> >> >> Regards. >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050. >> n4.nabble.com/ >> > > > > Thank you. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |