Hi all,
I am building an example with DataStream using Flink that has a fake source generator of LogLine(Date d, String line). I want to work with Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks. If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream I can group by second and concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I misunderstood something when I was reading about Event Time. Could anyone help me please? My source code is as follow. Thanks for the ideas. Kind Regards, Felipe import flink.util.LogLine; import flink.util.LogSourceFunction; import flink.util.UtilDate; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import javax.annotation.Nullable; import java.util.Date; public class EventTimeStreamExampleJava { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<LogLine> dataStream = env .addSource(new LogSourceFunction()) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) .keyBy(lineLog -> lineLog.getSec()) // .timeWindow(Time.seconds(2)) .reduce((log1, log2) -> new LogLine(log1.getTime(), log1.getLine() + " | " + log2.getLine())) ; dataStream.print(); env.execute("Window LogRead"); } public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<LogLine> { private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override public long extractTimestamp(LogLine element, long previousElementTimestamp) { long timestamp = element.getTime().getSeconds(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Nullable @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } } import org.apache.flink.streaming.api.functions.source.SourceFunction; public class LogSourceFunction implements SourceFunction<LogLine> { private volatile boolean isRunning = true; @Override public void run(SourceContext<LogLine> ctx) throws Exception { while (isRunning) { ctx.collect(new LogLine(UtilDate.getRandomSec(), UtilDate.getRandomString())); } } @Override public void cancel() { isRunning = false; } } package flink.util; import java.util.Date; import java.util.Objects; public class LogLine { private Date time; private int sec; private String line; public LogLine() { } public LogLine(Date time, String line) { this.sec = time.getSeconds(); this.time = time; this.line = line; } public LogLine(int sec, String line) { this.sec = sec; this.time = UtilDate.getRandomDate(sec); this.line = line; } public int getSec() { return sec; } public void setSec(int sec) { this.sec = sec; } public Date getTime() { return time; } public String getLine() { return line; } public void setTime(Date time) { this.time = time; } public void setLine(String line) { this.line = line; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; LogLine logLine = (LogLine) o; return Objects.equals(time, logLine.time) && Objects.equals(sec, logLine.sec) && Objects.equals(line, logLine.line); } @Override public int hashCode() { return Objects.hash(time, sec, line); } @Override public String toString() { return "LogLine{" + "time=" + time + ", sec=" + sec + ", line='" + line + '}'; } } |
Hi, The timestamps of the stream records should be increasing (strict monotonicity is not required, a bit out of orderness can be handled due to watermarks).Also, a timestamp in Flink is the number of milliseconds since 1970-01-01-00:00:00. However, your timestamp extractor only returns the number of seconds since last minute (i.e., from 0 to 60). You should use Date.getTime() instead of Date.getSeconds(). 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <[hidden email]>:
|
thanks a lot Fabian, It clarified my way to developing. I am using keyBy, timeWindow, and apply monad operator at the EventTimeStreamExampleJava now. I am generating dates in order and with a bit out of orderness now at LogSourceFunction. And only using Date as my key at LogLine object. If I understood watermarks well, my program should combine all the lines that are inside the same watermark when I set ".timeWindow(Time.seconds(5), Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it is still not happening because I didn't use a good key ".keyBy(lineLog -> lineLog.getTime())" and my key at the LogLineCounterFunction class is still the Date. public static class LogLineCounterFunction implements WindowFunction< LogLine, // input Tuple3<LogLine, Long, Integer>, // output Date, // key TimeWindow> { // window type What should I use as a key in my case? My output is combining only the lines with the same key (Date). I want to combine the dates between the watermarks ".timeWindow(Time.seconds(5), Time.seconds(1))"... 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534},1071516670000,9) 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184},1071516670000,4) 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884},1071516670000,12) 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15 16:31:03.784},1071516670000,1) 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334},1071516670000,4) On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <[hidden email]> wrote:
|
If you don't want to partition by key, i.e., have a single result for each time window, you should not use keyBy and an allWindow. However, this will only be executed with a parallelism of 1.2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <[hidden email]>:
|
thanks, I did using ".timeWindowAll(Time.seconds(5), Time.seconds(1)).apply(new LogLineAllWindowFunction());" My output is filtering only tha values inside the window. thanks, Felipe On Mon, Mar 19, 2018 at 10:54 AM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |