Hi,
I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?
package flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.Slide; import org.apache.flink.table.api.java.StreamTableEnvironment; import java.sql.Timestamp; public class StreamingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env); SingleOutputStreamOperator<WC> input = env .socketTextStream("localhost", 9000, "\n") .map(new MapFunction<String, WC>() { @Override public WC map(String value) throws Exception { String[] row = value.split(","); Timestamp timestamp = Timestamp.valueOf(row[2]); return new WC(row[0], Long.valueOf(row[1]), timestamp); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) { @Override public long extractTimestamp(WC element) { return element.dt.getTime(); } }); tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime"); Table table = tEnv.scan("WordCount") .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w")) .groupBy("w, word") .select("word, frequency.sum as frequency, w.start as dt"); DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class); result.print(); env.execute(); } public static class WC { public String word; public long frequency; public Timestamp dt; public WC() { } public WC(String word, long frequency, Timestamp dt) { this.word = word; this.frequency = frequency; this.dt = dt; } @Override public String toString() { return "WC " + word + " " + frequency + " " + dt.getTime(); } } }
Sample input:
hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04
Thanks
Free forum by Nabble | Edit this page |