Window not emitting output after upgrade to Flink 1.1.1

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Window not emitting output after upgrade to Flink 1.1.1

Yassin Marzouki
Hi all,

The following code works under Flink 1.0.3, but under 1.1.1 it just switches to FINISHED and doesn't output any result.

stream.map(new RichMapFunction<String, Request>() {

        private ObjectMapper objectMapper;

        @Override
        public void open(Configuration parameters) {
            objectMapper = new ObjectMapper();
        }

        @Override
        public Request map(String value) throws Exception {
            return objectMapper.readValue(value, Request.class);
        }

    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
        @Override
        public long extractAscendingTimestamp(Request req) {
            return req.ts;
        }
    })
    .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, req.location, 1))
    .keyBy(0)
    .timeWindow(Time.minutes(10))
    .apply(
            (Tuple3<String, String, Integer> x, Tuple3<String, String, Integer> y) -> y,
            (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
                Tuple3<String, String, Integer> res = itrbl.iterator().next();
                clctr.collect(new Tuple2<>(res.f1, res.f2));
            })
    .print();

The problem is with the window operator because I could print results before it.

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: Window not emitting output after upgrade to Flink 1.1.1

Kostas Kloudas
Hi Yassine,

Are you reading from a file and use ingestion time?
If yes, then the problem can be related to this:


Is this the case?

Best,
Kostas

On Aug 12, 2016, at 10:30 AM, Yassine Marzougui <[hidden email]> wrote:

Hi all,

The following code works under Flink 1.0.3, but under 1.1.1 it just switches to FINISHED and doesn't output any result.

stream.map(new RichMapFunction<String, Request>() {

        private ObjectMapper objectMapper;

        @Override
        public void open(Configuration parameters) {
            objectMapper = new ObjectMapper();
        }

        @Override
        public Request map(String value) throws Exception {
            return objectMapper.readValue(value, Request.class);
        }

    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
        @Override
        public long extractAscendingTimestamp(Request req) {
            return req.ts;
        }
    })
    .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, req.location, 1))
    .keyBy(0)
    .timeWindow(Time.minutes(10))
    .apply(
            (Tuple3<String, String, Integer> x, Tuple3<String, String, Integer> y) -> y,
            (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
                Tuple3<String, String, Integer> res = itrbl.iterator().next();
                clctr.collect(new Tuple2<>(res.f1, res.f2));
            })
    .print();

The problem is with the window operator because I could print results before it.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Window not emitting output after upgrade to Flink 1.1.1

Yassin Marzouki
Hi Kostas,

Yes, that's the case. I will revert back to 1.0.3 until the bug is fixed.
Thank you.

Best,
Yassine

On Fri, Aug 12, 2016 at 10:34 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Yassine,

Are you reading from a file and use ingestion time?
If yes, then the problem can be related to this:


Is this the case?

Best,
Kostas

On Aug 12, 2016, at 10:30 AM, Yassine Marzougui <[hidden email]> wrote:

Hi all,

The following code works under Flink 1.0.3, but under 1.1.1 it just switches to FINISHED and doesn't output any result.

stream.map(new RichMapFunction<String, Request>() {

        private ObjectMapper objectMapper;

        @Override
        public void open(Configuration parameters) {
            objectMapper = new ObjectMapper();
        }

        @Override
        public Request map(String value) throws Exception {
            return objectMapper.readValue(value, Request.class);
        }

    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
        @Override
        public long extractAscendingTimestamp(Request req) {
            return req.ts;
        }
    })
    .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, req.location, 1))
    .keyBy(0)
    .timeWindow(Time.minutes(10))
    .apply(
            (Tuple3<String, String, Integer> x, Tuple3<String, String, Integer> y) -> y,
            (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
                Tuple3<String, String, Integer> res = itrbl.iterator().next();
                clctr.collect(new Tuple2<>(res.f1, res.f2));
            })
    .print();

The problem is with the window operator because I could print results before it.

Best,
Yassine