Cannot see all events in window apply() for big input

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot see all events in window apply() for big input

Hung
This post was updated on .
Hi Flink users,

we have an issue to see all events in the window apply() function, while we see them before the window operation.

The input is from Kafka and contains at least 6 topics which is at least 30 GB in total, and we have tested locally in IDE and cluster using 1.1.3 and 1.0.3.

It works when:
- using version 1.0.3 we can see all events but we lost around 1/3 of them
- using processing time
- setting the Kafka offset as latest
- using less topics (3 topics)

But we want to use offset as earliest in event time, and read 6 topics or more.

Our test code:

       List<String> topicList = getAllEventTopic(eventsConf);

        // Start Flink job
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", Config.bootstrapServers);
        properties.setProperty("group.id", parameter.getRequired("groupId"));
        // It works with latest
        properties.setProperty("auto.offset.reset", "earliest");

        DataStream<String> streams = env.addSource(
                new FlinkKafkaConsumer09<>(topicList, new SimpleStringSchema(), properties));

        DataStream<JSONObject> jsonStreams = streams.flatMap(new JSONMap());
       
        // Our use case actually uses keyBy() and reduce(). This is for testing purpose.
        jsonStreams.assignTimestampsAndWatermarks(new TestWatermark()).rebalance().windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
                .allowedLateness(Time.seconds(10))
        .apply(new AllWindowFunction<JSONObject, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<JSONObject> iterable, Collector collector) throws Exception {
                Iterator<JSONObject> it = iterable.iterator();
                while(it.hasNext()){
                    collector.collect(it.next());
                }
            }
        }).writeAsText("test", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is there any suggestion that we could try to fix the issue?

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Till Rohrmann
Hi Sendoh,

from your description it's really hard to figure out what the problem could be. The first thing to do would be check how many records you actually consume from Kafka and how many items are outputted. Next I would take a look at the timestamp extractor. Can it be that records are discarded because they have a wrong timestamp? It could be that the elements arrive out of order.

Cheers,
Till

On Mon, Nov 7, 2016 at 4:15 PM, Sendoh <[hidden email]> wrote:
Hi Flink users,

we have an issue to see all events in the window apply() function, while we
see them before the window operation.

The input is from Kafka and contains at least 6 topics which is at least 30
GB in total, and we have tested locally in IDE and cluster using 1.1.3 and
1.0.3.

It works when:
- using version 1.0.3 we can see all events but we lost around 1/3 of them
- using processing time
- setting the Kafka offset as latest
- using less topics (3 topics)

But we want to use offset as earliest in event time, and read 6 topics or
more.

Our test code:

       List<String> topicList = getAllEventTopic(eventsConf);

        // Start Flink job
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
Config.bootstrapServers);
        properties.setProperty("group.id",
parameter.getRequired("groupId"));
        // It works
        properties.setProperty("auto.offset.reset", "earliest");

        DataStream<String> streams = env.addSource(
                new FlinkKafkaConsumer09<>(topicList, new
SimpleStringSchema(), properties));

        DataStream<JSONObject> jsonStreams = streams.flatMap(new JSONMap());

        // we actually use keyBy(). This is for testing purpose.
        jsonStreams.assignTimestampsAndWatermarks(new
TestWatermark()).rebalance().windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
                .allowedLateness(Time.seconds(10))
        .apply(new AllWindowFunction<JSONObject, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<JSONObject>
iterable, Collector collector) throws Exception {
                Iterator<JSONObject> it = iterable.iterator();
                while(it.hasNext()){
                    collector.collect(it.next());
                }
            }
        }).writeAsText("test",
FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is there any suggestion that we could try to fix the issue?

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Hung
Hi Till.

Thank you for suggesting. We know the timestamp is correct because another Flink job is running with the three topics correctly. We also know the operators work well before window apply() because we check the result before window apply().

What currently I have a doubt is the necessary parallelisms for window operation if reprocessing a skew input from Kafka because it works with fewer events, and small topics always appear while big topics disappear.

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Till Rohrmann

And this other job also performs a window operation based on event time?

What do you mean with “I have a doubt is the necessary parallelism for window operation if reprocessing a skew input from Kafka”?

Also be aware that the windowAll operation is executed with a dop of 1, making it effectively a non-parallel operation.

Have you tried switching to the latest Flink version for the tests?

Cheers,
Till


On Mon, Nov 7, 2016 at 5:43 PM, Sendoh <[hidden email]> wrote:
Hi Till.

Thank you for suggesting. We know the timestamp is correct because another
Flink job is running with the three topics correctly. We also know the
operators work well before window apply() because we check the result before
window apply().

What currently I have a doubt is the necessary parallelisms for window
operation if reprocessing a skew input from Kafka because it works with
fewer events, and small topics always appear while big topics disappear.

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945p9950.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Hung
Yes. the other job performs event time window and we tried 1.2-SNAPSHOT and 1.1.3. The old version 1.0.3 we lost much much less data. We tried both windowAll() and keyBy() window() already, and tried very tiny lag and window(1 millisecond).

My doubt comes from smaller input works while bigger input has issue (events disappear).

For example, eventA disappears with timestamp after Oct.24 and appears again after around 5 minutes with timestamp at Nov.08, and all events in between(10-25 to 11-07) are missing. The output of the window gets stuck for around 5 minutes. However, if this flink job only reads eventA, we can see all of them.

It looks like data is stuck in that operator and the watermark of that event which should trigger the window comes too late, when there is a lot of data, or?

Best,

Sendoh

Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Hung
In reply to this post by Hung
Hi,

Would the issue be events are too out of ordered and the watermark is global?

We want to count event per event type per day, and the data looks like:

eventA, 10-29-XX
eventB,, 11-02-XX
eventB,, 11-02-XX
eventB,, 11-03-XX
eventB,, 11-04-XX
....
....
eventA, 10-29-XX
eventA, 10-30-XX
eventA, 10-30-XX
.
.
.
eventA, 11-04-XX


eventA is much much larger than eventB,
and it looks like we lost the count of eventA at 10-29 and 10-30 while we have count of eventA at 11-04-XX.
Could it be the problem that watermark is gloabal rather than per event?

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Till Rohrmann
Hi Sendoh,

Flink should actually never lose data unless it is so late that it arrives after the allowed lateness. This should be independent of the total data size.

The watermarks are indeed global and not bound to a specific input element or a group. So for example if you create the watermarks from the timestamp information of your events and you have the following input event sequence: (eventA, 01-01), (eventB, 02-01), (eventC, 01-02). Then you would generate the watermark W(02-01) after the second event. The third event would then be a late element and if it exceeds the allowed lateness, then it will be discarded.

What you have to make sure is that the events in your queue have a monotonically increasing timestamp if you generate the watermarks from a timestamp field of the events.

Cheers,
Till

On Tue, Nov 8, 2016 at 3:37 PM, Sendoh <[hidden email]> wrote:
Hi,

Would the issue be events are too out of ordered and the watermark is
global?

We want to count event per event type per day, and the data looks like:

eventA, 10-29-XX
eventB,, 11-02-XX
eventB,, 11-02-XX
eventB,, 11-03-XX
eventB,, 11-04-XX
....
....
eventA, 10-29-XX
eventA, 10-30-XX
eventA, 10-30-XX
.
.
.
eventA, 11-04-XX


eventA is much much larger than eventB,
and it looks like we lost the count of eventA at 10-29 and 10-30 while we
have count of eventA at 11-04-XX.
Could it be the problem that watermark is gloabal rather than per event?

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945p9985.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Hung
Thank you for confirming.

What would you think an efficient way not having global watermark? The following logic fails to build Watermark per KeyStream:
jsonStreams.keyBy(new JsonKeySelector()).assignTimestampsAndWatermarks(new JsonWatermark()).keyBy(JsonKeySelector()).window(....

So, using split(), or implementing an event type recognized AssignerWithPeriodicWatermarks along with custom EventTimeTrigger would be the solution?

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Till Rohrmann
Flink does not support per key watermarks or type sensitive watermarks. The underlying assumption is that you have a global watermark which defines the progress wrt to event time in your topology.

The easiest way would be to have an input which has a monotonically increasing timestamp. Alternatively you can define the maximum lag between the watermark and the timestamp and then generate watermarks with w = timestamp - maxLag. That way you allow elements to be out of order for a certain amount of event time.

Cheers,
Till

On Tue, Nov 8, 2016 at 5:02 PM, Sendoh <[hidden email]> wrote:
Thank you for confirming.

What would you think an efficient way not having global watermark? The
following logic fails to build Watermark per KeyStream:
jsonStreams.keyBy(new JsonKeySelector()).assignTimestampsAndWatermarks(new
JsonWatermark()).keyBy(JsonKeySelector()).window(....

So, using split(), or implementing an event type recognized
AssignerWithPeriodicWatermarks along with custom EventTimeTrigger would be
the solution?

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945p9988.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Cannot see all events in window apply() for big input

Hung
Hi,

We let watermark proceed at the earliest timestamp among all event types. Our test result looks correct.

/*
* Watermark proceeds at the earliest timestamp among all the event types
* */
public class EventsWatermark implements AssignerWithPeriodicWatermarks<Map<String, Object>> {

    private final long maxTimeLag = 180000;

    private long currentMaxTimestamp;
    private Map<String, Long> eventTimestampMap;
    private int eventSize;

    public EventsWatermark(int eventSize){
        this.eventSize = eventSize;
        eventTimestampMap = new HashMap<>();
    }

    @Override
    public long extractTimestamp(Map<String, Object> element, long previousElementTimestamp) {
        long occurredAtLong = DateTime.parse(element.get("occurred_at").toString(), Config.timeFormatter).getMillis();
        String eventType = element.get("event_type").toString();

        // Update the timestamp of this event
        eventTimestampMap.put(eventType, occurredAtLong);

        // Haven't collected all timestamps of events, so watermark is not forwarding
        if(eventSize != eventTimestampMap.size()){
            currentMaxTimestamp = Math.min(occurredAtLong, currentMaxTimestamp);
        }
        // Get the smallest timestamp of all events which should be the watermark that can proceed
        else{
            // Get the earliest timestamp of all events
            currentMaxTimestamp = Collections.min(eventTimestampMap.values());
        }
        return occurredAtLong;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxTimeLag);

    }
}

Cheers,

Sendoh