In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Yassine MARZOUGUI
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

rmetzger0
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Yassine MARZOUGUI
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine


Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Yassine MARZOUGUI
I forgot to mention : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine



Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

rmetzger0
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <[hidden email]> wrote:
I forgot to mention : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine




Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Aljoscha Krettek
Hi,
could you please try adding this custom watermark debugger to see what's going on with the element timestamps and watermarks:

public static class WatermarkDebugger<T>
        extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        System.out.println("ELEMENT: " + element);
        output.collect(element);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        System.out.println("WM: " + mark);
    }
}

you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new WatermarkDebugger<Tuple2<String, Integer>>());

That should give us something to work with.

Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger <[hidden email]> wrote:
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <[hidden email]> wrote:
I forgot to mention : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine




Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Yassine MARZOUGUI
Hi Aljoscha,

Please excuse me for the late response; I've been busy for the whole previous week.
I used the custom watermark debugger (with 1.1, I changed super.processWatermark(mark) to super.output.emitWatermark(mark)), surprisingly with 1.2, only one watremark is printed at the end of the stream with the value WM: Watermark @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are printed periodically. I am  using the following revision of 1.2-SNAPSHOT : https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9.

I uploaded the dataset I'm using as an input here : https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing ,the first column corresponds to the timestamp.

You can find the code below. Thanks you for your help.

import com.opencsv.CSVParser;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.util.*;

/**
 * Created by ymarzougui on 11/1/2016.
 */
public class SortedSessionsAssigner {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple3<Long,String,String>> waterMarked = env.readTextFile("file:///E:\\data\\anonymized.csv")
                .flatMap(new RichFlatMapFunction<String, Tuple3<Long,String,String>>() {
                    public CSVParser csvParser;

                    @Override
                    public void open(Configuration config) {
                        csvParser = new CSVParser(',', '"');
                    }

                    @Override
                    public void flatMap(String in, Collector<Tuple3<Long,String,String>> clctr) throws Exception {
                        String[] result = csvParser.parseLine(in);
                        clctr.collect(Tuple3.of(Long.parseLong(result[0]), result[1], result[2]));
                    }
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
                        return tuple3.f0;
                    }
                });

        DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions = waterMarked
                .keyBy(1)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .apply(new WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>, Long>, Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, String, String>> iterable, Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws Exception {
                        TreeMap<String,Double> treeMap = new TreeMap<String, Double>();
                        Long session_count = 0L;
                        for (Tuple3<Long, String, String> tuple3 : iterable){
                            treeMap.put(tuple3.f2, treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
                            session_count += 1;
                        }
                        collector.collect(Tuple2.of(treeMap, session_count));

                    }
                }).setParallelism(8);

        waterMarked.transform("WatermarkDebugger", waterMarked.getType(), new WatermarkDebugger<Tuple3<Long, String, String>>());

        //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

        env.execute("Sorted Sessions Assigner");

    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // 1.2-snapshot
            super.processWatermark(mark);
            // 1.1-snapshot
            //super.output.emitWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }

}

Best,
Yassine

2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please try adding this custom watermark debugger to see what's going on with the element timestamps and watermarks:

public static class WatermarkDebugger<T>
        extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        System.out.println("ELEMENT: " + element);
        output.collect(element);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        System.out.println("WM: " + mark);
    }
}

you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new WatermarkDebugger<Tuple2<String, Integer>>());

That should give us something to work with.

Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger <[hidden email]> wrote:
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <[hidden email]> wrote:
I forgot to mention : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine





Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Aljoscha Krettek
Hi Yassine,
I managed to reproduce the problem. The cause is that we recently changed how the timer service is being cleaned up and now the watermark timers are not firing anymore.

I'll keep you posted and hope to find a solution fast.

Cheers,
Aljoscha

On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI <[hidden email]> wrote:
Hi Aljoscha,

Please excuse me for the late response; I've been busy for the whole previous week.
I used the custom watermark debugger (with 1.1, I changed super.processWatermark(mark) to super.output.emitWatermark(mark)), surprisingly with 1.2, only one watremark is printed at the end of the stream with the value WM: Watermark @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are printed periodically. I am  using the following revision of 1.2-SNAPSHOT : https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9.

I uploaded the dataset I'm using as an input here : https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing ,the first column corresponds to the timestamp.

You can find the code below. Thanks you for your help.

import com.opencsv.CSVParser;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.util.*;

/**
 * Created by ymarzougui on 11/1/2016.
 */
public class SortedSessionsAssigner {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple3<Long,String,String>> waterMarked = env.readTextFile("file:///E:\\data\\anonymized.csv")
                .flatMap(new RichFlatMapFunction<String, Tuple3<Long,String,String>>() {
                    public CSVParser csvParser;

                    @Override
                    public void open(Configuration config) {
                        csvParser = new CSVParser(',', '"');
                    }

                    @Override
                    public void flatMap(String in, Collector<Tuple3<Long,String,String>> clctr) throws Exception {
                        String[] result = csvParser.parseLine(in);
                        clctr.collect(Tuple3.of(Long.parseLong(result[0]), result[1], result[2]));
                    }
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
                        return tuple3.f0;
                    }
                });

        DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions = waterMarked
                .keyBy(1)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .apply(new WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>, Long>, Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, String, String>> iterable, Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws Exception {
                        TreeMap<String,Double> treeMap = new TreeMap<String, Double>();
                        Long session_count = 0L;
                        for (Tuple3<Long, String, String> tuple3 : iterable){
                            treeMap.put(tuple3.f2, treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
                            session_count += 1;
                        }
                        collector.collect(Tuple2.of(treeMap, session_count));

                    }
                }).setParallelism(8);

        waterMarked.transform("WatermarkDebugger", waterMarked.getType(), new WatermarkDebugger<Tuple3<Long, String, String>>());

        //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

        env.execute("Sorted Sessions Assigner");

    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // 1.2-snapshot
            super.processWatermark(mark);
            // 1.1-snapshot
            //super.output.emitWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }

}

Best,
Yassine

2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please try adding this custom watermark debugger to see what's going on with the element timestamps and watermarks:

public static class WatermarkDebugger<T>
        extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        System.out.println("ELEMENT: " + element);
        output.collect(element);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        System.out.println("WM: " + mark);
    }
}

you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new WatermarkDebugger<Tuple2<String, Integer>>());

That should give us something to work with.

Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger <[hidden email]> wrote:
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <[hidden email]> wrote:
I forgot to mention : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine





Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Aljoscha Krettek
Hi Yassine,
for a bit more detailed explanation: We internally changed how the timer system works, this timer system is also used to periodically extract watermarks. Due to this change, in your case we don't extract watermarks anymore.

Internally, your call resolves to something like this:

Env.readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval)

with the FileProcessingMode being set to PROCESS_ONCE.

To get back the old behaviour you can call this method directly with PROCESS_CONTINUOUSLY. This will keep the pipeline running and will also ensure that watermarks keep being extracted.

In your case, it is not strictly wrong to emit only one large watermark in the end because your processing is finite. I admit that the change from Flink 1.1 seems a bit strange but this should only occur in toy examples where the data is finite.

Does that help?

Cheers,
Aljoscha

On Tue, 13 Dec 2016 at 18:17 Aljoscha Krettek <[hidden email]> wrote:
Hi Yassine,
I managed to reproduce the problem. The cause is that we recently changed how the timer service is being cleaned up and now the watermark timers are not firing anymore.

I'll keep you posted and hope to find a solution fast.

Cheers,
Aljoscha

On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI <[hidden email]> wrote:
Hi Aljoscha,

Please excuse me for the late response; I've been busy for the whole previous week.
I used the custom watermark debugger (with 1.1, I changed super.processWatermark(mark) to super.output.emitWatermark(mark)), surprisingly with 1.2, only one watremark is printed at the end of the stream with the value WM: Watermark @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are printed periodically. I am  using the following revision of 1.2-SNAPSHOT : https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9.

I uploaded the dataset I'm using as an input here : https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing ,the first column corresponds to the timestamp.

You can find the code below. Thanks you for your help.

import com.opencsv.CSVParser;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.util.*;

/**
 * Created by ymarzougui on 11/1/2016.
 */
public class SortedSessionsAssigner {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple3<Long,String,String>> waterMarked = env.readTextFile("file:///E:\\data\\anonymized.csv")
                .flatMap(new RichFlatMapFunction<String, Tuple3<Long,String,String>>() {
                    public CSVParser csvParser;

                    @Override
                    public void open(Configuration config) {
                        csvParser = new CSVParser(',', '"');
                    }

                    @Override
                    public void flatMap(String in, Collector<Tuple3<Long,String,String>> clctr) throws Exception {
                        String[] result = csvParser.parseLine(in);
                        clctr.collect(Tuple3.of(Long.parseLong(result[0]), result[1], result[2]));
                    }
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
                        return tuple3.f0;
                    }
                });

        DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions = waterMarked
                .keyBy(1)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .apply(new WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>, Long>, Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, String, String>> iterable, Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws Exception {
                        TreeMap<String,Double> treeMap = new TreeMap<String, Double>();
                        Long session_count = 0L;
                        for (Tuple3<Long, String, String> tuple3 : iterable){
                            treeMap.put(tuple3.f2, treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
                            session_count += 1;
                        }
                        collector.collect(Tuple2.of(treeMap, session_count));

                    }
                }).setParallelism(8);

        waterMarked.transform("WatermarkDebugger", waterMarked.getType(), new WatermarkDebugger<Tuple3<Long, String, String>>());

        //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

        env.execute("Sorted Sessions Assigner");

    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // 1.2-snapshot
            super.processWatermark(mark);
            // 1.1-snapshot
            //super.output.emitWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }

}

Best,
Yassine

2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please try adding this custom watermark debugger to see what's going on with the element timestamps and watermarks:

public static class WatermarkDebugger<T>
        extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        System.out.println("ELEMENT: " + element);
        output.collect(element);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        System.out.println("WM: " + mark);
    }
}

you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new WatermarkDebugger<Tuple2<String, Integer>>());

That should give us something to work with.

Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger <[hidden email]> wrote:
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <[hidden email]> wrote:
I forgot to mention : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine





Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Yassine MARZOUGUI
Hi Aljoscha,

Thanks a lot for the explanation. Using readFile with PROCESS_CONTINUOUSLY solves it. Two more questions though:

1. Is it possible to gracefully stop the job once it has read the input once?
2. Does the watermark extraction period depend on the watch interval, or should any watch interval (except -1L) work the same way?

In my case the input is indeed finite and static, but contains hundreds of GBs, which made the window state grow quickly beyond the memory capacity, and the fact that the window contents were fired periodically helped keeping it small.

Best,
Yassine

2016-12-14 10:38 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi Yassine,
for a bit more detailed explanation: We internally changed how the timer system works, this timer system is also used to periodically extract watermarks. Due to this change, in your case we don't extract watermarks anymore.

Internally, your call resolves to something like this:

Env.readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval)

with the FileProcessingMode being set to PROCESS_ONCE.

To get back the old behaviour you can call this method directly with PROCESS_CONTINUOUSLY. This will keep the pipeline running and will also ensure that watermarks keep being extracted.

In your case, it is not strictly wrong to emit only one large watermark in the end because your processing is finite. I admit that the change from Flink 1.1 seems a bit strange but this should only occur in toy examples where the data is finite.

Does that help?

Cheers,
Aljoscha

On Tue, 13 Dec 2016 at 18:17 Aljoscha Krettek <[hidden email]> wrote:
Hi Yassine,
I managed to reproduce the problem. The cause is that we recently changed how the timer service is being cleaned up and now the watermark timers are not firing anymore.

I'll keep you posted and hope to find a solution fast.

Cheers,
Aljoscha

On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI <[hidden email]> wrote:
Hi Aljoscha,

Please excuse me for the late response; I've been busy for the whole previous week.
I used the custom watermark debugger (with 1.1, I changed super.processWatermark(mark) to super.output.emitWatermark(mark)), surprisingly with 1.2, only one watremark is printed at the end of the stream with the value WM: Watermark @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are printed periodically. I am  using the following revision of 1.2-SNAPSHOT : https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9.

I uploaded the dataset I'm using as an input here : https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing ,the first column corresponds to the timestamp.

You can find the code below. Thanks you for your help.

import com.opencsv.CSVParser;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.util.*;

/**
 * Created by ymarzougui on 11/1/2016.
 */
public class SortedSessionsAssigner {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple3<Long,String,String>> waterMarked = env.readTextFile("file:///E:\\data\\anonymized.csv")
                .flatMap(new RichFlatMapFunction<String, Tuple3<Long,String,String>>() {
                    public CSVParser csvParser;

                    @Override
                    public void open(Configuration config) {
                        csvParser = new CSVParser(',', '"');
                    }

                    @Override
                    public void flatMap(String in, Collector<Tuple3<Long,String,String>> clctr) throws Exception {
                        String[] result = csvParser.parseLine(in);
                        clctr.collect(Tuple3.of(Long.parseLong(result[0]), result[1], result[2]));
                    }
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
                        return tuple3.f0;
                    }
                });

        DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions = waterMarked
                .keyBy(1)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .apply(new WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>, Long>, Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, String, String>> iterable, Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws Exception {
                        TreeMap<String,Double> treeMap = new TreeMap<String, Double>();
                        Long session_count = 0L;
                        for (Tuple3<Long, String, String> tuple3 : iterable){
                            treeMap.put(tuple3.f2, treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
                            session_count += 1;
                        }
                        collector.collect(Tuple2.of(treeMap, session_count));

                    }
                }).setParallelism(8);

        waterMarked.transform("WatermarkDebugger", waterMarked.getType(), new WatermarkDebugger<Tuple3<Long, String, String>>());

        //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

        env.execute("Sorted Sessions Assigner");

    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // 1.2-snapshot
            super.processWatermark(mark);
            // 1.1-snapshot
            //super.output.emitWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }

}

Best,
Yassine

2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please try adding this custom watermark debugger to see what's going on with the element timestamps and watermarks:

public static class WatermarkDebugger<T>
        extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        System.out.println("ELEMENT: " + element);
        output.collect(element);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        System.out.println("WM: " + mark);
    }
}

you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new WatermarkDebugger<Tuple2<String, Integer>>());

That should give us something to work with.

Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger <[hidden email]> wrote:
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <[hidden email]> wrote:
I forgot to mention : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine






Reply | Threaded
Open this post in threaded view
|

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

Aljoscha Krettek
Hi,
right now, the only way of shutting down a running pipeline is to cancel it. You can do that in the JobManager dashboard or using the bin/flink command. And the watermark extraction period does not depend on the watch interval. It can be configured using env.getConfig().setAutoWatermarkInterval(long).

Cheers,
Aljoscha

On Thu, 15 Dec 2016 at 00:00 Yassine MARZOUGUI <[hidden email]> wrote:
Hi Aljoscha,

Thanks a lot for the explanation. Using readFile with PROCESS_CONTINUOUSLY solves it. Two more questions though:

1. Is it possible to gracefully stop the job once it has read the input once?
2. Does the watermark extraction period depend on the watch interval, or should any watch interval (except -1L) work the same way?

In my case the input is indeed finite and static, but contains hundreds of GBs, which made the window state grow quickly beyond the memory capacity, and the fact that the window contents were fired periodically helped keeping it small.

Best,
Yassine

2016-12-14 10:38 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi Yassine,
for a bit more detailed explanation: We internally changed how the timer system works, this timer system is also used to periodically extract watermarks. Due to this change, in your case we don't extract watermarks anymore.

Internally, your call resolves to something like this:

Env.readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval)

with the FileProcessingMode being set to PROCESS_ONCE.

To get back the old behaviour you can call this method directly with PROCESS_CONTINUOUSLY. This will keep the pipeline running and will also ensure that watermarks keep being extracted.

In your case, it is not strictly wrong to emit only one large watermark in the end because your processing is finite. I admit that the change from Flink 1.1 seems a bit strange but this should only occur in toy examples where the data is finite.

Does that help?

Cheers,
Aljoscha

On Tue, 13 Dec 2016 at 18:17 Aljoscha Krettek <[hidden email]> wrote:
Hi Yassine,
I managed to reproduce the problem. The cause is that we recently changed how the timer service is being cleaned up and now the watermark timers are not firing anymore.

I'll keep you posted and hope to find a solution fast.

Cheers,
Aljoscha

On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI <[hidden email]> wrote:
Hi Aljoscha,

Please excuse me for the late response; I've been busy for the whole previous week.
I used the custom watermark debugger (with 1.1, I changed super.processWatermark(mark) to super.output.emitWatermark(mark)), surprisingly with 1.2, only one watremark is printed at the end of the stream with the value WM: Watermark @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are printed periodically. I am  using the following revision of 1.2-SNAPSHOT : https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9.

I uploaded the dataset I'm using as an input here : https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing ,the first column corresponds to the timestamp.

You can find the code below. Thanks you for your help.

import com.opencsv.CSVParser;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.util.*;

/**
 * Created by ymarzougui on 11/1/2016.
 */
public class SortedSessionsAssigner {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple3<Long,String,String>> waterMarked = env.readTextFile("file:///E:\\data\\anonymized.csv")
                .flatMap(new RichFlatMapFunction<String, Tuple3<Long,String,String>>() {
                    public CSVParser csvParser;

                    @Override
                    public void open(Configuration config) {
                        csvParser = new CSVParser(',', '"');
                    }

                    @Override
                    public void flatMap(String in, Collector<Tuple3<Long,String,String>> clctr) throws Exception {
                        String[] result = csvParser.parseLine(in);
                        clctr.collect(Tuple3.of(Long.parseLong(result[0]), result[1], result[2]));
                    }
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
                        return tuple3.f0;
                    }
                });

        DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions = waterMarked
                .keyBy(1)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .apply(new WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>, Long>, Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, String, String>> iterable, Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws Exception {
                        TreeMap<String,Double> treeMap = new TreeMap<String, Double>();
                        Long session_count = 0L;
                        for (Tuple3<Long, String, String> tuple3 : iterable){
                            treeMap.put(tuple3.f2, treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
                            session_count += 1;
                        }
                        collector.collect(Tuple2.of(treeMap, session_count));

                    }
                }).setParallelism(8);

        waterMarked.transform("WatermarkDebugger", waterMarked.getType(), new WatermarkDebugger<Tuple3<Long, String, String>>());

        //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

        env.execute("Sorted Sessions Assigner");

    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // 1.2-snapshot
            super.processWatermark(mark);
            // 1.1-snapshot
            //super.output.emitWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }

}

Best,
Yassine

2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please try adding this custom watermark debugger to see what's going on with the element timestamps and watermarks:

public static class WatermarkDebugger<T>
        extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        System.out.println("ELEMENT: " + element);
        output.collect(element);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        System.out.println("WM: " + mark);
    }
}

you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new WatermarkDebugger<Tuple2<String, Integer>>());

That should give us something to work with.

Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger <[hidden email]> wrote:
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <[hidden email]> wrote:
I forgot to mention : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)               
                
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine