This post was updated on .
Hi Flink users,
we have an issue to see all events in the window apply() function, while we see them before the window operation. The input is from Kafka and contains at least 6 topics which is at least 30 GB in total, and we have tested locally in IDE and cluster using 1.1.3 and 1.0.3. It works when: - using version 1.0.3 we can see all events but we lost around 1/3 of them - using processing time - setting the Kafka offset as latest - using less topics (3 topics) But we want to use offset as earliest in event time, and read 6 topics or more. Our test code: List<String> topicList = getAllEventTopic(eventsConf); // Start Flink job StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", Config.bootstrapServers); properties.setProperty("group.id", parameter.getRequired("groupId")); // It works with latest properties.setProperty("auto.offset.reset", "earliest"); DataStream<String> streams = env.addSource( new FlinkKafkaConsumer09<>(topicList, new SimpleStringSchema(), properties)); DataStream<JSONObject> jsonStreams = streams.flatMap(new JSONMap()); // Our use case actually uses keyBy() and reduce(). This is for testing purpose. jsonStreams.assignTimestampsAndWatermarks(new TestWatermark()).rebalance().windowAll(TumblingEventTimeWindows.of(Time.minutes(1))) .allowedLateness(Time.seconds(10)) .apply(new AllWindowFunction<JSONObject, Object, TimeWindow>() { @Override public void apply(TimeWindow timeWindow, Iterable<JSONObject> iterable, Collector |
Hi Sendoh, from your description it's really hard to figure out what the problem could be. The first thing to do would be check how many records you actually consume from Kafka and how many items are outputted. Next I would take a look at the timestamp extractor. Can it be that records are discarded because they have a wrong timestamp? It could be that the elements arrive out of order. Cheers, Till On Mon, Nov 7, 2016 at 4:15 PM, Sendoh <[hidden email]> wrote: Hi Flink users, |
Hi Till.
Thank you for suggesting. We know the timestamp is correct because another Flink job is running with the three topics correctly. We also know the operators work well before window apply() because we check the result before window apply(). What currently I have a doubt is the necessary parallelisms for window operation if reprocessing a skew input from Kafka because it works with fewer events, and small topics always appear while big topics disappear. Best, Sendoh |
And this other job also performs a window operation based on event time? What do you mean with “I have a doubt is the necessary parallelism for window operation if reprocessing a skew input from Kafka”? Also be aware that the Have you tried switching to the latest Flink version for the tests? Cheers, On Mon, Nov 7, 2016 at 5:43 PM, Sendoh <[hidden email]> wrote: Hi Till. |
Yes. the other job performs event time window and we tried 1.2-SNAPSHOT and 1.1.3. The old version 1.0.3 we lost much much less data. We tried both windowAll() and keyBy() window() already, and tried very tiny lag and window(1 millisecond).
My doubt comes from smaller input works while bigger input has issue (events disappear). For example, eventA disappears with timestamp after Oct.24 and appears again after around 5 minutes with timestamp at Nov.08, and all events in between(10-25 to 11-07) are missing. The output of the window gets stuck for around 5 minutes. However, if this flink job only reads eventA, we can see all of them. It looks like data is stuck in that operator and the watermark of that event which should trigger the window comes too late, when there is a lot of data, or? Best, Sendoh |
In reply to this post by Hung
Hi,
Would the issue be events are too out of ordered and the watermark is global? We want to count event per event type per day, and the data looks like: eventA, 10-29-XX eventB,, 11-02-XX eventB,, 11-02-XX eventB,, 11-03-XX eventB,, 11-04-XX .... .... eventA, 10-29-XX eventA, 10-30-XX eventA, 10-30-XX . . . eventA, 11-04-XX eventA is much much larger than eventB, and it looks like we lost the count of eventA at 10-29 and 10-30 while we have count of eventA at 11-04-XX. Could it be the problem that watermark is gloabal rather than per event? Best, Sendoh |
Hi Sendoh, Flink should actually never lose data unless it is so late that it arrives after the allowed lateness. This should be independent of the total data size. The watermarks are indeed global and not bound to a specific input element or a group. So for example if you create the watermarks from the timestamp information of your events and you have the following input event sequence: (eventA, 01-01), (eventB, 02-01), (eventC, 01-02). Then you would generate the watermark W(02-01) after the second event. The third event would then be a late element and if it exceeds the allowed lateness, then it will be discarded. What you have to make sure is that the events in your queue have a monotonically increasing timestamp if you generate the watermarks from a timestamp field of the events. Cheers, Till On Tue, Nov 8, 2016 at 3:37 PM, Sendoh <[hidden email]> wrote: Hi, |
Thank you for confirming.
What would you think an efficient way not having global watermark? The following logic fails to build Watermark per KeyStream: jsonStreams.keyBy(new JsonKeySelector()).assignTimestampsAndWatermarks(new JsonWatermark()).keyBy(JsonKeySelector()).window(.... So, using split(), or implementing an event type recognized AssignerWithPeriodicWatermarks along with custom EventTimeTrigger would be the solution? Best, Sendoh |
Flink does not support per key watermarks or type sensitive watermarks. The underlying assumption is that you have a global watermark which defines the progress wrt to event time in your topology. The easiest way would be to have an input which has a monotonically increasing timestamp. Alternatively you can define the maximum lag between the watermark and the timestamp and then generate watermarks with w = timestamp - maxLag. That way you allow elements to be out of order for a certain amount of event time. Cheers, Till On Tue, Nov 8, 2016 at 5:02 PM, Sendoh <[hidden email]> wrote: Thank you for confirming. |
Hi,
We let watermark proceed at the earliest timestamp among all event types. Our test result looks correct. /* * Watermark proceeds at the earliest timestamp among all the event types * */ public class EventsWatermark implements AssignerWithPeriodicWatermarks<Map<String, Object>> { private final long maxTimeLag = 180000; private long currentMaxTimestamp; private Map<String, Long> eventTimestampMap; private int eventSize; public EventsWatermark(int eventSize){ this.eventSize = eventSize; eventTimestampMap = new HashMap<>(); } @Override public long extractTimestamp(Map<String, Object> element, long previousElementTimestamp) { long occurredAtLong = DateTime.parse(element.get("occurred_at").toString(), Config.timeFormatter).getMillis(); String eventType = element.get("event_type").toString(); // Update the timestamp of this event eventTimestampMap.put(eventType, occurredAtLong); // Haven't collected all timestamps of events, so watermark is not forwarding if(eventSize != eventTimestampMap.size()){ currentMaxTimestamp = Math.min(occurredAtLong, currentMaxTimestamp); } // Get the smallest timestamp of all events which should be the watermark that can proceed else{ // Get the earliest timestamp of all events currentMaxTimestamp = Collections.min(eventTimestampMap.values()); } return occurredAtLong; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxTimeLag); } } Cheers, Sendoh |
Free forum by Nabble | Edit this page |