This post was updated on .
Hi,
I'm running a Flink application where data are retrieved from a Kafka broker and forwarded to a Cassandra sink. I've implemented the following watermark emitter: public class CustomTimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple8<String, String, Date, String, String, Double, Double, Double>>{ private final long maxOutOfOrderness = 800; private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple8<String, String, Date, String, String, Double, Double, Double> element, long previousElementTimestamp) { long timestamp = element.f2.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } While I have implemented a record reordering in windows on event time basis: ... .window(TumblingEventTimeWindows.of(Time.milliseconds(800))) .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord, String, TimeWindow>() { public void apply(String key, TimeWindow window, Iterable<Harness.KafkaRecord> input, Collector<Harness.KafkaRecord> out) throws Exception { ArrayList<Harness.KafkaRecord> list = new ArrayList<Harness.KafkaRecord>(); for (Harness.KafkaRecord in: input) list.add(in); Collections.sort(list); for(Harness.KafkaRecord output: list) out.collect(output); } }); Unfortunately when I check Cassandra's destination table size I note that some messages are lost. Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to see lower loss percentage with the lower ingestion frequency, instead it is the opposite!! P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss percentage: 50 Hz: 0.273% 25 Hz: 0.284% 15 Hz: 0.302% My suspect is that the data are lost because they arrive with a too high lateness and they are dropped by Flink. Is it a possibility? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrea, you are right. Flink's window operators can drop messages which are too late, i.e., have a timestamp smaller than the last watermark.There are a couple of options how to deal with late elements: 1. Use more conservative watermarks. This will add latency to your program 2. Configure an allowedLateness parameter for windows but have to be able to handle respective updates. [2] 3. Use side outputs on windows (will become available with Flink 1.4) [3] Cheers, Fabian 2017-11-12 21:29 GMT+01:00 AndreaKinn <[hidden email]>: Hi, |
Getting late elements from side-output is already available with
Flink 1.3 :) Regards, Kien On 11/13/2017 5:00 PM, Fabian Hueske
wrote:
|
Thanks for the correction! :-) 2017-11-13 13:05 GMT+01:00 Kien Truong <[hidden email]>:
|
Free forum by Nabble | Edit this page |