Hi all,
The following code works under Flink 1.0.3, but under 1.1.1 it just switches to FINISHED and doesn't output any result. stream.map(new RichMapFunction<String, Request>() { private ObjectMapper objectMapper; @Override public void open(Configuration parameters) { objectMapper = new ObjectMapper(); } @Override public Request map(String value) throws Exception { return objectMapper.readValue(value, Request.class); } }) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() { @Override public long extractAscendingTimestamp(Request req) { return req.ts; } }) .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, req.location, 1)) .keyBy(0) .timeWindow(Time.minutes(10)) .apply( (Tuple3<String, String, Integer> x, Tuple3<String, String, Integer> y) -> y, (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> { Tuple3<String, String, Integer> res = itrbl.iterator().next(); clctr.collect(new Tuple2<>(res.f1, res.f2)); }) .print(); The problem is with the window operator because I could print results before it. Best, Yassine |
Hi Yassine,
Are you reading from a file and use ingestion time? If yes, then the problem can be related to this: Is this the case? Best, Kostas
|
Hi Kostas, Yes, that's the case. I will revert back to 1.0.3 until the bug is fixed. Thank you. Best, Yassine On Fri, Aug 12, 2016 at 10:34 AM, Kostas Kloudas <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |