package org.monitoring.stream.analytics; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.WindowedTable; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.api.java.Tumble; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.descriptors.Json; public class FlinkTableSourceLatest { public static void main(String[] args) throws Exception { // Read parameters from command line final ParameterTool params = ParameterTool.fromPropertiesFile("application.properties"); if(params.getNumberOfParameters() < 4) { System.out.println("\nUsage: FlinkReadKafka " + "--read-topic " + "--write-topic " + "--bootstrap.servers " + "--group.id "); return; } // define a schema // setup streaming environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(300000); // 300 seconds env.getConfig().setGlobalJobParameters(params); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv .connect( new Kafka() .version("0.11") .topic("testout") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "analytics") .startFromLatest()) .withFormat( new Json() /* .jsonSchema("{\n" + " \"type\": \"object\",\n" + " \"properties\": {\n" + " \"food\": {\n" + " \"type\": \"string\"\n" + " },\n" + " \"price\": {\n" + " \"type\": \"integer\"\n" + " },\n" + " \"processingTime\": {\n" + " \"type\": \"integer\"\n" + " }\n" + " }\n" + "}")*/ .failOnMissingField(false) .deriveSchema() ) .withSchema( new Schema() .field("food", "VARCHAR") .field("price", "DECIMAL") .field("processingTime", "TIMESTAMP").proctime()) .inAppendMode() .registerTableSource("foodTable"); String sql ="SELECT food FROM foodTable WHERE processingTime BETWEEN processingTime - INTERVAL '4' HOUR AND processingTime"; WindowedTable windowedTable = tableEnv.scan("foodTable").window(Tumble.over("50.minutes").on("processingTime").as("userActionWindow")); Table result = tableEnv.sqlQuery(sql); System.out.println("======================= "+result.toString()); /* // create a partition for the data going into kafka FlinkFixedPartitioner partition = new FlinkFixedPartitioner(); // create new tablesink of JSON to kafka KafkaJsonTableSink kafkaTableSink = new Kafka010JsonTableSink( params.getRequired("write-topic"), params.getProperties(), partition);*/ tableEnv .connect( new Kafka() .version("0.11") .topic("testout") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "analytics") .sinkPartitionerRoundRobin() .startFromLatest()) .withFormat( new Json() /* .jsonSchema("{\"type\": \"object\",\"properties\": {\n" + " \"food\": {\n" + " \"type\": \"string\"\n" + " }}")*/ .failOnMissingField(false) .deriveSchema()) .withSchema( new Schema() .field("food", "VARCHAR")) .inAppendMode() .registerTableSink("ruleTable"); result.insertInto("ruleTable"); env.execute("FlinkReadWriteKafkaJSON"); } }