Hi, I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated? package flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.Slide; import org.apache.flink.table.api.java.StreamTableEnvironment; import java.sql.Timestamp; public class StreamingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env); SingleOutputStreamOperator<WC> input = env .socketTextStream("localhost", 9000, "\n") .map(new MapFunction<String, WC>() { @Override public WC map(String value) throws Exception { String[] row = value.split(","); Timestamp timestamp = Timestamp.valueOf(row[2]); return new WC(row[0], Long.valueOf(row[1]), timestamp); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) { @Override public long extractTimestamp(WC element) { return element.dt.getTime(); } }); tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime"); Table table = tEnv.scan("WordCount") .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w")) .groupBy("w, word") .select("word, frequency.sum as frequency, w.start as dt"); DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class); result.print(); env.execute(); } public static class WC { public String word; public long frequency; public Timestamp dt; public WC() { } public WC(String word, long frequency, Timestamp dt) { this.word = word; this.frequency = frequency; this.dt = dt; } @Override public String toString() { return "WC " + word + " " + frequency + " " + dt.getTime(); } } }
Sample input: hello,1,2017-12-14 13:10:01
Thanks |
Hi, you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks. The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured. 2017-12-14 16:30 GMT+01:00 Plamen Paskov <[hidden email]>:
|
Hi Fabian, Thank you for your response! I think it's not necessary to do
that because i have a call to anyway: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); which do exactly what you say. It set the watermark interval to 200ms . I think i found the problem and it is the default event-time trigger attached to the assigner?. According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window." . All i have to do in order to trigger the computation is to send an event which will fall in "next" window. So the question now is how can i set trigger to fire in regular intervals (e.g. every 5 seconds) using table API? On 14.12.2017 17:57, Fabian Hueske
wrote:
|
Hi, yes you are right. I forgot that the interval is set by default when enabling event time.Also your comment about triggering the window is correct. Technically, you don't need a record that falls into the next window, but just a watermark that is past the window boundary. In your case, watermarks only advance if the assigner sees more records and you'd need a record with a timestamp of at least 2017-12-14 13:10:15 (or 16), because the watermark assigner subtracts 10 seconds. Given the current watermark assigner, there is no other way than sending more records to trigger a window computation. You can implement a custom assigner to also emit watermarks without data, but that would somewhat bind the event-time watermarks to the clock of the generating machine such that watermarks wouldn't be only data-driven. Best, Fabian 2017-12-14 17:25 GMT+01:00 Plamen Paskov <[hidden email]>:
|
Free forum by Nabble | Edit this page |