Login  Register

Re: Cannot see all events in window apply() for big input

Posted by Till Rohrmann on Nov 07, 2016; 4:20pm
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945p9949.html

Hi Sendoh,

from your description it's really hard to figure out what the problem could be. The first thing to do would be check how many records you actually consume from Kafka and how many items are outputted. Next I would take a look at the timestamp extractor. Can it be that records are discarded because they have a wrong timestamp? It could be that the elements arrive out of order.

Cheers,
Till

On Mon, Nov 7, 2016 at 4:15 PM, Sendoh <[hidden email]> wrote:
Hi Flink users,

we have an issue to see all events in the window apply() function, while we
see them before the window operation.

The input is from Kafka and contains at least 6 topics which is at least 30
GB in total, and we have tested locally in IDE and cluster using 1.1.3 and
1.0.3.

It works when:
- using version 1.0.3 we can see all events but we lost around 1/3 of them
- using processing time
- setting the Kafka offset as latest
- using less topics (3 topics)

But we want to use offset as earliest in event time, and read 6 topics or
more.

Our test code:

       List<String> topicList = getAllEventTopic(eventsConf);

        // Start Flink job
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
Config.bootstrapServers);
        properties.setProperty("group.id",
parameter.getRequired("groupId"));
        // It works
        properties.setProperty("auto.offset.reset", "earliest");

        DataStream<String> streams = env.addSource(
                new FlinkKafkaConsumer09<>(topicList, new
SimpleStringSchema(), properties));

        DataStream<JSONObject> jsonStreams = streams.flatMap(new JSONMap());

        // we actually use keyBy(). This is for testing purpose.
        jsonStreams.assignTimestampsAndWatermarks(new
TestWatermark()).rebalance().windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
                .allowedLateness(Time.seconds(10))
        .apply(new AllWindowFunction<JSONObject, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<JSONObject>
iterable, Collector collector) throws Exception {
                Iterator<JSONObject> it = iterable.iterator();
                while(it.hasNext()){
                    collector.collect(it.next());
                }
            }
        }).writeAsText("test",
FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is there any suggestion that we could try to fix the issue?

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.