How to correct use timeWindow() with DataStream?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to correct use timeWindow() with DataStream?

Felipe Gutierrez
Hi all,

I am building an example with DataStream using Flink that has a fake source generator of LogLine(Date d, String line). I want to work with Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks. If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream I can group by second and concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I misunderstood something when I was reading about Event Time. Could anyone help me please? My source code is as follow.

Thanks for the ideas. Kind Regards,  Felipe

package flink.example.streaming;

import flink.util.LogLine;
import flink.util.LogSourceFunction;
import flink.util.UtilDate;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;
import java.util.Date;

public class EventTimeStreamExampleJava {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<LogLine> dataStream = env
                .addSource(new LogSourceFunction())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
                .keyBy(lineLog -> lineLog.getSec())
                // .timeWindow(Time.seconds(2))
                .reduce((log1, log2) -> new LogLine(log1.getTime(), log1.getLine() + " | " + log2.getLine()))
                ;

        dataStream.print();

        env.execute("Window LogRead");
    }

    public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<LogLine> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(LogLine element, long previousElementTimestamp) {
            long timestamp = element.getTime().getSeconds();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}

package flink.util;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class LogSourceFunction implements SourceFunction<LogLine> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<LogLine> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(new LogLine(UtilDate.getRandomSec(), UtilDate.getRandomString()));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package flink.util;

import java.util.Date;
import java.util.Objects;

public class LogLine {

    private Date time;
    private int sec;
    private String line;

    public LogLine() {
    }

    public LogLine(Date time, String line) {
        this.sec = time.getSeconds();
        this.time = time;
        this.line = line;
    }

    public LogLine(int sec, String line) {
        this.sec = sec;
        this.time = UtilDate.getRandomDate(sec);
        this.line = line;
    }

    public int getSec() {
        return sec;
    }

    public void setSec(int sec) {
        this.sec = sec;
    }

    public Date getTime() {
        return time;
    }

    public String getLine() {
        return line;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        LogLine logLine = (LogLine) o;
        return Objects.equals(time, logLine.time) &&
                Objects.equals(sec, logLine.sec) &&
                Objects.equals(line, logLine.line);
    }

    @Override
    public int hashCode() {

        return Objects.hash(time, sec, line);
    }

    @Override
    public String toString() {
        return "LogLine{" +
                "time=" + time +
                ", sec=" + sec +
                ", line='" + line +
                '}';
    }
}


--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How to correct use timeWindow() with DataStream?

Fabian Hueske-2
Hi,

The timestamps of the stream records should be increasing (strict monotonicity is not required, a bit out of orderness can be handled due to watermarks).
So, the events should also be generated with increasing timestamps. It looks like your generator generates random dates. I'd also generate data with millisecond precision, not just days.

Also, a timestamp in Flink is the number of milliseconds since 1970-01-01-00:00:00.
However, your timestamp extractor only returns the number of seconds since last minute (i.e., from 0 to 60). You should use Date.getTime() instead of Date.getSeconds().

Best, Fabian

2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <[hidden email]>:
Hi all,

I am building an example with DataStream using Flink that has a fake source generator of LogLine(Date d, String line). I want to work with Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks. If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream I can group by second and concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I misunderstood something when I was reading about Event Time. Could anyone help me please? My source code is as follow.

Thanks for the ideas. Kind Regards,  Felipe

package flink.example.streaming;

import flink.util.LogLine;
import flink.util.LogSourceFunction;
import flink.util.UtilDate;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;
import java.util.Date;

public class EventTimeStreamExampleJava {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<LogLine> dataStream = env
                .addSource(new LogSourceFunction())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
                .keyBy(lineLog -> lineLog.getSec())
                // .timeWindow(Time.seconds(2))
                .reduce((log1, log2) -> new LogLine(log1.getTime(), log1.getLine() + " | " + log2.getLine()))
                ;

        dataStream.print();

        env.execute("Window LogRead");
    }

    public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<LogLine> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(LogLine element, long previousElementTimestamp) {
            long timestamp = element.getTime().getSeconds();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}

package flink.util;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class LogSourceFunction implements SourceFunction<LogLine> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<LogLine> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(new LogLine(UtilDate.getRandomSec(), UtilDate.getRandomString()));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package flink.util;

import java.util.Date;
import java.util.Objects;

public class LogLine {

    private Date time;
    private int sec;
    private String line;

    public LogLine() {
    }

    public LogLine(Date time, String line) {
        this.sec = time.getSeconds();
        this.time = time;
        this.line = line;
    }

    public LogLine(int sec, String line) {
        this.sec = sec;
        this.time = UtilDate.getRandomDate(sec);
        this.line = line;
    }

    public int getSec() {
        return sec;
    }

    public void setSec(int sec) {
        this.sec = sec;
    }

    public Date getTime() {
        return time;
    }

    public String getLine() {
        return line;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        LogLine logLine = (LogLine) o;
        return Objects.equals(time, logLine.time) &&
                Objects.equals(sec, logLine.sec) &&
                Objects.equals(line, logLine.line);
    }

    @Override
    public int hashCode() {

        return Objects.hash(time, sec, line);
    }

    @Override
    public String toString() {
        return "LogLine{" +
                "time=" + time +
                ", sec=" + sec +
                ", line='" + line +
                '}';
    }
}


--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez

Reply | Threaded
Open this post in threaded view
|

Re: How to correct use timeWindow() with DataStream?

Felipe Gutierrez
thanks a lot Fabian,

It clarified my way to developing. I am using keyBy, timeWindow, and apply monad operator at the EventTimeStreamExampleJava now. I am generating dates in order and with a bit out of orderness now at LogSourceFunction. And only using Date as my key at LogLine object.

If I understood watermarks well, my program should combine all the lines that are inside the same watermark when I set ".timeWindow(Time.seconds(5), Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it is still not happening because I didn't use a good key ".keyBy(lineLog -> lineLog.getTime())" and my key at the LogLineCounterFunction class is still the Date.

public static class LogLineCounterFunction implements WindowFunction<
        LogLine, // input
        Tuple3<LogLine, Long, Integer>, // output
        Date, // key
        TimeWindow> { // window type

What should I use as a key in my case?

My output is combining only the lines with the same key (Date). I want to combine the dates between the watermarks ".timeWindow(Time.seconds(5), Time.seconds(1))"...

3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534},1071516670000,9)
3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184},1071516670000,4)
3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884},1071516670000,12)
3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15 16:31:03.784},1071516670000,1)
3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334},1071516670000,4)




On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

The timestamps of the stream records should be increasing (strict monotonicity is not required, a bit out of orderness can be handled due to watermarks).
So, the events should also be generated with increasing timestamps. It looks like your generator generates random dates. I'd also generate data with millisecond precision, not just days.

Also, a timestamp in Flink is the number of milliseconds since 1970-01-01-00:00:00.
However, your timestamp extractor only returns the number of seconds since last minute (i.e., from 0 to 60). You should use Date.getTime() instead of Date.getSeconds().

Best, Fabian

2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <[hidden email]>:
Hi all,

I am building an example with DataStream using Flink that has a fake source generator of LogLine(Date d, String line). I want to work with Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks. If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream I can group by second and concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I misunderstood something when I was reading about Event Time. Could anyone help me please? My source code is as follow.

Thanks for the ideas. Kind Regards,  Felipe

package flink.example.streaming;

import flink.util.LogLine;
import flink.util.LogSourceFunction;
import flink.util.UtilDate;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;
import java.util.Date;

public class EventTimeStreamExampleJava {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<LogLine> dataStream = env
                .addSource(new LogSourceFunction())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
                .keyBy(lineLog -> lineLog.getSec())
                // .timeWindow(Time.seconds(2))
                .reduce((log1, log2) -> new LogLine(log1.getTime(), log1.getLine() + " | " + log2.getLine()))
                ;

        dataStream.print();

        env.execute("Window LogRead");
    }

    public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<LogLine> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(LogLine element, long previousElementTimestamp) {
            long timestamp = element.getTime().getSeconds();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}

package flink.util;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class LogSourceFunction implements SourceFunction<LogLine> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<LogLine> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(new LogLine(UtilDate.getRandomSec(), UtilDate.getRandomString()));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package flink.util;

import java.util.Date;
import java.util.Objects;

public class LogLine {

    private Date time;
    private int sec;
    private String line;

    public LogLine() {
    }

    public LogLine(Date time, String line) {
        this.sec = time.getSeconds();
        this.time = time;
        this.line = line;
    }

    public LogLine(int sec, String line) {
        this.sec = sec;
        this.time = UtilDate.getRandomDate(sec);
        this.line = line;
    }

    public int getSec() {
        return sec;
    }

    public void setSec(int sec) {
        this.sec = sec;
    }

    public Date getTime() {
        return time;
    }

    public String getLine() {
        return line;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        LogLine logLine = (LogLine) o;
        return Objects.equals(time, logLine.time) &&
                Objects.equals(sec, logLine.sec) &&
                Objects.equals(line, logLine.line);
    }

    @Override
    public int hashCode() {

        return Objects.hash(time, sec, line);
    }

    @Override
    public String toString() {
        return "LogLine{" +
                "time=" + time +
                ", sec=" + sec +
                ", line='" + line +
                '}';
    }
}


--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez




--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How to correct use timeWindow() with DataStream?

Fabian Hueske-2
If you don't want to partition by key, i.e., have a single result for each time window, you should not use keyBy and an allWindow.
However, this will only be executed with a parallelism of 1.

2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <[hidden email]>:
thanks a lot Fabian,

It clarified my way to developing. I am using keyBy, timeWindow, and apply monad operator at the EventTimeStreamExampleJava now. I am generating dates in order and with a bit out of orderness now at LogSourceFunction. And only using Date as my key at LogLine object.

If I understood watermarks well, my program should combine all the lines that are inside the same watermark when I set ".timeWindow(Time.seconds(5), Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it is still not happening because I didn't use a good key ".keyBy(lineLog -> lineLog.getTime())" and my key at the LogLineCounterFunction class is still the Date.

public static class LogLineCounterFunction implements WindowFunction<
        LogLine, // input
        Tuple3<LogLine, Long, Integer>, // output
        Date, // key
        TimeWindow> { // window type

What should I use as a key in my case?

My output is combining only the lines with the same key (Date). I want to combine the dates between the watermarks ".timeWindow(Time.seconds(5), Time.seconds(1))"...

3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534},1071516670000,9)
3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184},1071516670000,4)
3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884},1071516670000,12)
3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15 16:31:03.784},1071516670000,1)
3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334},1071516670000,4)




On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

The timestamps of the stream records should be increasing (strict monotonicity is not required, a bit out of orderness can be handled due to watermarks).
So, the events should also be generated with increasing timestamps. It looks like your generator generates random dates. I'd also generate data with millisecond precision, not just days.

Also, a timestamp in Flink is the number of milliseconds since 1970-01-01-00:00:00.
However, your timestamp extractor only returns the number of seconds since last minute (i.e., from 0 to 60). You should use Date.getTime() instead of Date.getSeconds().

Best, Fabian

2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <[hidden email]>:
Hi all,

I am building an example with DataStream using Flink that has a fake source generator of LogLine(Date d, String line). I want to work with Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks. If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream I can group by second and concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I misunderstood something when I was reading about Event Time. Could anyone help me please? My source code is as follow.

Thanks for the ideas. Kind Regards,  Felipe

package flink.example.streaming;

import flink.util.LogLine;
import flink.util.LogSourceFunction;
import flink.util.UtilDate;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;
import java.util.Date;

public class EventTimeStreamExampleJava {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<LogLine> dataStream = env
                .addSource(new LogSourceFunction())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
                .keyBy(lineLog -> lineLog.getSec())
                // .timeWindow(Time.seconds(2))
                .reduce((log1, log2) -> new LogLine(log1.getTime(), log1.getLine() + " | " + log2.getLine()))
                ;

        dataStream.print();

        env.execute("Window LogRead");
    }

    public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<LogLine> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(LogLine element, long previousElementTimestamp) {
            long timestamp = element.getTime().getSeconds();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}

package flink.util;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class LogSourceFunction implements SourceFunction<LogLine> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<LogLine> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(new LogLine(UtilDate.getRandomSec(), UtilDate.getRandomString()));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package flink.util;

import java.util.Date;
import java.util.Objects;

public class LogLine {

    private Date time;
    private int sec;
    private String line;

    public LogLine() {
    }

    public LogLine(Date time, String line) {
        this.sec = time.getSeconds();
        this.time = time;
        this.line = line;
    }

    public LogLine(int sec, String line) {
        this.sec = sec;
        this.time = UtilDate.getRandomDate(sec);
        this.line = line;
    }

    public int getSec() {
        return sec;
    }

    public void setSec(int sec) {
        this.sec = sec;
    }

    public Date getTime() {
        return time;
    }

    public String getLine() {
        return line;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        LogLine logLine = (LogLine) o;
        return Objects.equals(time, logLine.time) &&
                Objects.equals(sec, logLine.sec) &&
                Objects.equals(line, logLine.line);
    }

    @Override
    public int hashCode() {

        return Objects.hash(time, sec, line);
    }

    @Override
    public String toString() {
        return "LogLine{" +
                "time=" + time +
                ", sec=" + sec +
                ", line='" + line +
                '}';
    }
}


--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez




--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez

Reply | Threaded
Open this post in threaded view
|

Re: How to correct use timeWindow() with DataStream?

Felipe Gutierrez
thanks,

I did using ".timeWindowAll(Time.seconds(5), Time.seconds(1)).apply(new LogLineAllWindowFunction());" My output is filtering only tha values inside the window.

thanks, Felipe



On Mon, Mar 19, 2018 at 10:54 AM, Fabian Hueske <[hidden email]> wrote:
If you don't want to partition by key, i.e., have a single result for each time window, you should not use keyBy and an allWindow.
However, this will only be executed with a parallelism of 1.

2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <[hidden email]>:
thanks a lot Fabian,

It clarified my way to developing. I am using keyBy, timeWindow, and apply monad operator at the EventTimeStreamExampleJava now. I am generating dates in order and with a bit out of orderness now at LogSourceFunction. And only using Date as my key at LogLine object.

If I understood watermarks well, my program should combine all the lines that are inside the same watermark when I set ".timeWindow(Time.seconds(5), Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it is still not happening because I didn't use a good key ".keyBy(lineLog -> lineLog.getTime())" and my key at the LogLineCounterFunction class is still the Date.

public static class LogLineCounterFunction implements WindowFunction<
        LogLine, // input
        Tuple3<LogLine, Long, Integer>, // output
        Date, // key
        TimeWindow> { // window type

What should I use as a key in my case?

My output is combining only the lines with the same key (Date). I want to combine the dates between the watermarks ".timeWindow(Time.seconds(5), Time.seconds(1))"...

3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534},1071516670000,9)
3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184},1071516670000,4)
3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884},1071516670000,12)
3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15 16:31:03.784},1071516670000,1)
3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334},1071516670000,4)




On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

The timestamps of the stream records should be increasing (strict monotonicity is not required, a bit out of orderness can be handled due to watermarks).
So, the events should also be generated with increasing timestamps. It looks like your generator generates random dates. I'd also generate data with millisecond precision, not just days.

Also, a timestamp in Flink is the number of milliseconds since 1970-01-01-00:00:00.
However, your timestamp extractor only returns the number of seconds since last minute (i.e., from 0 to 60). You should use Date.getTime() instead of Date.getSeconds().

Best, Fabian

2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <[hidden email]>:
Hi all,

I am building an example with DataStream using Flink that has a fake source generator of LogLine(Date d, String line). I want to work with Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks. If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream I can group by second and concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I misunderstood something when I was reading about Event Time. Could anyone help me please? My source code is as follow.

Thanks for the ideas. Kind Regards,  Felipe

package flink.example.streaming;

import flink.util.LogLine;
import flink.util.LogSourceFunction;
import flink.util.UtilDate;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;
import java.util.Date;

public class EventTimeStreamExampleJava {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<LogLine> dataStream = env
                .addSource(new LogSourceFunction())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
                .keyBy(lineLog -> lineLog.getSec())
                // .timeWindow(Time.seconds(2))
                .reduce((log1, log2) -> new LogLine(log1.getTime(), log1.getLine() + " | " + log2.getLine()))
                ;

        dataStream.print();

        env.execute("Window LogRead");
    }

    public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<LogLine> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(LogLine element, long previousElementTimestamp) {
            long timestamp = element.getTime().getSeconds();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}

package flink.util;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class LogSourceFunction implements SourceFunction<LogLine> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<LogLine> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(new LogLine(UtilDate.getRandomSec(), UtilDate.getRandomString()));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package flink.util;

import java.util.Date;
import java.util.Objects;

public class LogLine {

    private Date time;
    private int sec;
    private String line;

    public LogLine() {
    }

    public LogLine(Date time, String line) {
        this.sec = time.getSeconds();
        this.time = time;
        this.line = line;
    }

    public LogLine(int sec, String line) {
        this.sec = sec;
        this.time = UtilDate.getRandomDate(sec);
        this.line = line;
    }

    public int getSec() {
        return sec;
    }

    public void setSec(int sec) {
        this.sec = sec;
    }

    public Date getTime() {
        return time;
    }

    public String getLine() {
        return line;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        LogLine logLine = (LogLine) o;
        return Objects.equals(time, logLine.time) &&
                Objects.equals(sec, logLine.sec) &&
                Objects.equals(line, logLine.line);
    }

    @Override
    public int hashCode() {

        return Objects.hash(time, sec, line);
    }

    @Override
    public String toString() {
        return "LogLine{" +
                "time=" + time +
                ", sec=" + sec +
                ", line='" + line +
                '}';
    }
}


--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez




--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez




--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez