Hi Fabian,
Thank you for your response! I think it's not necessary to do that because i have a call to anyway:
env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime); which do exactly what you say. It set the watermark interval to 200ms . I think i found the problem and it is the default event-time trigger attached to the assigner?. According to the docs here https://ci.apache.org/ projects/flink/flink-docs- : "all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window." . All i have to do in order to trigger the computation is to send an event which will fall in "next" window. So the question now is how can i set trigger to fire in regular intervals (e.g. every 5 seconds) using table API?release-1.3/dev/windows.html
On 14.12.2017 17:57, Fabian Hueske wrote:
Best, FabianLong story short: you need to configure the watermark interval: env.getConfig.Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).Hi,you are using a BoundedOutOfOrdernessTimestamp
Extractor to generate watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured.
setAutoWatermarkInterval(100L) ;
2017-12-14 16:30 GMT+01:00 Plamen Paskov <[hidden email]> :
Hi,
I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?
package flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tupl e.Tuple2; import org.apache.flink.streaming.api .TimeCharacteristic; import org.apache.flink.streaming.api .datastream.DataStream; import org.apache.flink.streaming.api .datastream.SingleOutputStream Operator; import org.apache.flink.streaming.api .environment.StreamExecutionEn vironment; import org.apache.flink.streaming.api .functions.timestamps.BoundedO utOfOrdernessTimestampExtracto r; import org.apache.flink.streaming.api .windowing.time.Time; import org.apache.flink.table.api.Tab le; import org.apache.flink.table.api.jav a.Slide; import org.apache.flink.table.api.jav a.StreamTableEnvironment; import java.sql.Timestamp; public class StreamingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.get ExecutionEnvironment (); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime ); StreamTableEnvironment tEnv = StreamTableEnvironment.getTabl eEnvironment (env); SingleOutputStreamOperator<WC> input = env .socketTextStream("localhost", 9000, "\n") .map(new MapFunction<String, WC>() { @Override public WC map(String value) throws Exception { String[] row = value.split(","); Timestamp timestamp = Timestamp.valueOf(row[2]); return new WC(row[0], Long.valueOf(row[1]), timestamp); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestamp Extractor<WC>(Time.seconds(10) ) { @Override public long extractTimestamp(WC element) { return element.dt.getTime(); } }); tEnv.registerDataStream("WordC ount" , input, "word, frequency, dt.rowtime"); Table table = tEnv.scan("WordCount") .window(Slide.over("10.seconds" ).every("5.seconds").on("dt").as("w")) .groupBy("w, word") .select("word, frequency.sum as frequency, w.start as dt"); DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class); result.print(); env.execute(); } public static class WC { public String word; public long frequency; public Timestamp dt; public WC() { } public WC(String word, long frequency, Timestamp dt) { this.word = word; this.frequency = frequency; this.dt = dt; } @Override public String toString() { return "WC " + word + " " + frequency + " " + dt.getTime(); } } }
Sample input:
hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04
Thanks
Free forum by Nabble | Edit this page |