Hi Fabian,
Thank you for your response! I think it's not necessary to do
that because i have a call to anyway:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); which do exactly what you say. It set the watermark interval to 200ms . I think i found the problem and it is the default event-time trigger attached to the assigner?. According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window." . All i have to do in order to trigger the computation is to send an event which will fall in "next" window. So the question now is how can i set trigger to fire in regular intervals (e.g. every 5 seconds) using table API?
Best, FabianLong story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L);Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).Hi,you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured.
2017-12-14 16:30 GMT+01:00 Plamen Paskov <[hidden email]>:
Hi,
I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?
package flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java. tuple.Tuple2; import org.apache.flink.streaming. api.TimeCharacteristic; import org.apache.flink.streaming. api.datastream.DataStream; import org.apache.flink.streaming. api.datastream. SingleOutputStreamOperator; import org.apache.flink.streaming. api.environment. StreamExecutionEnvironment; import org.apache.flink.streaming. api.functions.timestamps. BoundedOutOfOrdernessTimestamp Extractor; import org.apache.flink.streaming. api.windowing.time.Time; import org.apache.flink.table.api. Table; import org.apache.flink.table.api. java.Slide; import org.apache.flink.table.api. java.StreamTableEnvironment; import java.sql.Timestamp; public class StreamingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.get ExecutionEnvironment (); env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.getTabl eEnvironment (env); SingleOutputStreamOperator<WC> input = env .socketTextStream("localhost", 9000, "\n") .map(new MapFunction<String, WC>() { @Override public WC map(String value) throws Exception { String[] row = value.split(","); Timestamp timestamp = Timestamp.valueOf(row[2]); return new WC(row[0], Long.valueOf(row[1]), timestamp); } }) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10) ) { @Override public long extractTimestamp(WC element) { return element.dt.getTime(); } }); tEnv.registerDataStream(" WordCount" , input, "word, frequency, dt.rowtime"); Table table = tEnv.scan("WordCount") .window(Slide.over("10.seconds" ).every("5.seconds").on("dt").as("w")) .groupBy("w, word") .select("word, frequency.sum as frequency, w.start as dt"); DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class); result.print(); env.execute(); } public static class WC { public String word; public long frequency; public Timestamp dt; public WC() { } public WC(String word, long frequency, Timestamp dt) { this.word = word; this.frequency = frequency; this.dt = dt; } @Override public String toString() { return "WC " + word + " " + frequency + " " + dt.getTime(); } } }
Sample input:
hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04
Thanks
Free forum by Nabble | Edit this page |