Re: missing data in window.reduce() while apply() is ok

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: missing data in window.reduce() while apply() is ok

Aljoscha Krettek
Hi,
could you please go into more detail about the input and what the expected output is. And then also what the output is with both apply() and reduce()?

With this we might be able to figure it out together.

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 18:11 Sendoh <[hidden email]> wrote:
Hi Flink users,

I saw a strange behavior that data are missing in reduce() but apply()
doesn't, and when using 1.0.3 we don't see this behavior, and we see this in
1.1.3. Losing data means we don't see any event in the keys assigned, not
the count of events.

The code is as follows.

DataStream<Map&lt;String, Object>> streams = env.addSource(new
FlinkKafkaConsumer09<>(topicList, new SimpleStringSchema(), properties))
                .name("kafka_topics")
                .rebalance()
                .flatMap(new Eventsmap(events))
                .assignTimestampsAndWatermarks(new EventWatermark());

        DataStream<Map&lt;String, Object>> count = streams
                .keyBy(new
CompoundJsonKeySelector()).timeWindow(Time.minutes(1))
                .allowedLateness(Time.minutes(3))
//               apply is ok
//                .apply(new WindowFunction<Map&lt;String, Object>,
Map<String, Object>, String, TimeWindow>() {
//                           @Override
//                           public void apply(String s, TimeWindow
timeWindow, Iterable<Map&lt;String, Object>> iterable,
Collector<Map&lt;String, Object>> collector) throws Exception {
//                               Iterator<Map&lt;String, Object>> it =
iterable.iterator();
//                               collector.collect(it.next());
//                           }
//                       }
//                );
//               reduce() loses data
                .reduce(new ReduceFunction<Map&lt;String, Object>>() {
                    @Override
                    public Map<String, Object> reduce(Map<String, Object>
v1, Map<String, Object> v2) throws Exception {
                        int newCount =
Integer.parseInt(v1.get("count").toString()) +
Integer.parseInt(v2.get("count").toString());
                        v2.put("count",newCount);
                        return v2;
                    }
                });

Best,

Is there any suggestion that we can try to
figure out the root cause?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/missing-data-in-window-reduce-while-apply-is-ok-tp9689.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.