Cannot see all events in window apply() for big input

Posted by Hung on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945.html

Hi Flink users,

we have an issue to see all events in the window apply() function, while we see them before the window operation.

The input is from Kafka and contains at least 6 topics which is at least 30 GB in total, and we have tested locally in IDE and cluster using 1.1.3 and 1.0.3.

It works when:
- using version 1.0.3 we can see all events but we lost around 1/3 of them
- using processing time
- setting the Kafka offset as latest
- using less topics (3 topics)

But we want to use offset as earliest in event time, and read 6 topics or more.

Our test code:

       List<String> topicList = getAllEventTopic(eventsConf);

        // Start Flink job
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", Config.bootstrapServers);
        properties.setProperty("group.id", parameter.getRequired("groupId"));
        // It works with latest
        properties.setProperty("auto.offset.reset", "earliest");

        DataStream<String> streams = env.addSource(
                new FlinkKafkaConsumer09<>(topicList, new SimpleStringSchema(), properties));

        DataStream<JSONObject> jsonStreams = streams.flatMap(new JSONMap());
       
        // Our use case actually uses keyBy() and reduce(). This is for testing purpose.
        jsonStreams.assignTimestampsAndWatermarks(new TestWatermark()).rebalance().windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
                .allowedLateness(Time.seconds(10))
        .apply(new AllWindowFunction<JSONObject, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<JSONObject> iterable, Collector collector) throws Exception {
                Iterator<JSONObject> it = iterable.iterator();
                while(it.hasNext()){
                    collector.collect(it.next());
                }
            }
        }).writeAsText("test", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is there any suggestion that we could try to fix the issue?

Best,

Sendoh