Flink drops messages?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink drops messages?

AndreaKinn
This post was updated on .
Hi,
I'm running a Flink application where data are retrieved from a Kafka broker
and forwarded to a Cassandra sink.
I've implemented the following watermark emitter:

public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple8&lt;String, String, Date, String,
String, Double, Double, Double>>{

    private final long maxOutOfOrderness = 800;
    private long currentMaxTimestamp;
   
        @Override
        public long extractTimestamp(Tuple8<String, String, Date, String, String,
Double, Double, Double> element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
}

While I have implemented a record reordering in windows on event time basis:

...
.window(TumblingEventTimeWindows.of(Time.milliseconds(800)))
                                .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {

                                        public void apply(String key,
                                                        TimeWindow window,
                                                        Iterable<Harness.KafkaRecord> input,
                                                        Collector<Harness.KafkaRecord> out)
                                                                        throws Exception {

                                                ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();

                                                for (Harness.KafkaRecord in: input)
                                                        list.add(in);
                                                Collections.sort(list);
                                                for(Harness.KafkaRecord output: list)
                                                        out.collect(output);
                                        }
                                });

Unfortunately when I check Cassandra's destination table size I note that
some messages are lost.

Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
see lower loss percentage with the lower ingestion frequency, instead it is
the opposite!!

P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
percentage:

50 Hz: 0.273%
25 Hz: 0.284%
15 Hz: 0.302%

My suspect is that the data are lost because they arrive with a too high
lateness and they are dropped by Flink. Is it a possibility?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink drops messages?

Fabian Hueske-2
Hi Andrea,

you are right. Flink's window operators can drop messages which are too late, i.e., have a timestamp smaller than the last watermark.
This is expected behavior and documented at several places [1] [2].

There are a couple of options how to deal with late elements:

1. Use more conservative watermarks. This will add latency to your program
2. Configure an allowedLateness parameter for windows but have to be able to handle respective updates. [2]
3. Use side outputs on windows (will become available with Flink 1.4) [3]

Cheers, Fabian

2017-11-12 21:29 GMT+01:00 AndreaKinn <[hidden email]>:
Hi,
I'm running a Flink application where data are retrieved from a Kafka broker
and forwarded to a Cassandra sink.
I've implemented the following watermark emitter:

public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple8&lt;String, String, Date, String,
String, Double, Double, Double>>{

    private final long maxOutOfOrderness = 800;
    private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(Tuple8<String, String, Date, String, String,
Double, Double, Double> element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
}

While I have implemented a record reordering in windows on event time basis:

...
.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {

                                        public void apply(String key,
                                                        TimeWindow window,
                                                        Iterable<Harness.KafkaRecord> input,
                                                        Collector<Harness.KafkaRecord> out)
                                                                        throws Exception {

                                                ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();

                                                for (Harness.KafkaRecord in: input)
                                                        list.add(in);
                                                Collections.sort(list);
                                                for(Harness.KafkaRecord output: list)
                                                        out.collect(output);
                                        }
                                });

Unfortunately when I check Cassandra's destination table size I note that
some messages are lost.

Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
see lower loss percentage with the lower ingestion frequency, instead it is
the opposite!!

P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
percentage:

50 Hz: 0.273%
25 Hz: 0.284%
15 Hz: 0.302%

My suspect is that the data are lost because they arrive with a too high
lateness and they are dropped by Flink. Is it a possibility?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink drops messages?

Kien Truong

Getting late elements from side-output is already available with Flink 1.3 :)

Regards,

Kien

On 11/13/2017 5:00 PM, Fabian Hueske wrote:
Hi Andrea,

you are right. Flink's window operators can drop messages which are too late, i.e., have a timestamp smaller than the last watermark.
This is expected behavior and documented at several places [1] [2].

There are a couple of options how to deal with late elements:

1. Use more conservative watermarks. This will add latency to your program
2. Configure an allowedLateness parameter for windows but have to be able to handle respective updates. [2]
3. Use side outputs on windows (will become available with Flink 1.4) [3]

Cheers, Fabian

2017-11-12 21:29 GMT+01:00 AndreaKinn <[hidden email]>:
Hi,
I'm running a Flink application where data are retrieved from a Kafka broker
and forwarded to a Cassandra sink.
I've implemented the following watermark emitter:

public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple8&lt;String, String, Date, String,
String, Double, Double, Double>>{

    private final long maxOutOfOrderness = 800;
    private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(Tuple8<String, String, Date, String, String,
Double, Double, Double> element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
}

While I have implemented a record reordering in windows on event time basis:

...
.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {

                                        public void apply(String key,
                                                        TimeWindow window,
                                                        Iterable<Harness.KafkaRecord> input,
                                                        Collector<Harness.KafkaRecord> out)
                                                                        throws Exception {

                                                ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();

                                                for (Harness.KafkaRecord in: input)
                                                        list.add(in);
                                                Collections.sort(list);
                                                for(Harness.KafkaRecord output: list)
                                                        out.collect(output);
                                        }
                                });

Unfortunately when I check Cassandra's destination table size I note that
some messages are lost.

Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
see lower loss percentage with the lower ingestion frequency, instead it is
the opposite!!

P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
percentage:

50 Hz: 0.273%
25 Hz: 0.284%
15 Hz: 0.302%

My suspect is that the data are lost because they arrive with a too high
lateness and they are dropped by Flink. Is it a possibility?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink drops messages?

Fabian Hueske-2
Thanks for the correction! :-)

2017-11-13 13:05 GMT+01:00 Kien Truong <[hidden email]>:

Getting late elements from side-output is already available with Flink 1.3 :)

Regards,

Kien

On 11/13/2017 5:00 PM, Fabian Hueske wrote:
Hi Andrea,

you are right. Flink's window operators can drop messages which are too late, i.e., have a timestamp smaller than the last watermark.
This is expected behavior and documented at several places [1] [2].

There are a couple of options how to deal with late elements:

1. Use more conservative watermarks. This will add latency to your program
2. Configure an allowedLateness parameter for windows but have to be able to handle respective updates. [2]
3. Use side outputs on windows (will become available with Flink 1.4) [3]

Cheers, Fabian

2017-11-12 21:29 GMT+01:00 AndreaKinn <[hidden email]>:
Hi,
I'm running a Flink application where data are retrieved from a Kafka broker
and forwarded to a Cassandra sink.
I've implemented the following watermark emitter:

public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple8&lt;String, String, Date, String,
String, Double, Double, Double>>{

    private final long maxOutOfOrderness = 800;
    private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(Tuple8<String, String, Date, String, String,
Double, Double, Double> element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
}

While I have implemented a record reordering in windows on event time basis:

...
.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {

                                        public void apply(String key,
                                                        TimeWindow window,
                                                        Iterable<Harness.KafkaRecord> input,
                                                        Collector<Harness.KafkaRecord> out)
                                                                        throws Exception {

                                                ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();

                                                for (Harness.KafkaRecord in: input)
                                                        list.add(in);
                                                Collections.sort(list);
                                                for(Harness.KafkaRecord output: list)
                                                        out.collect(output);
                                        }
                                });

Unfortunately when I check Cassandra's destination table size I note that
some messages are lost.

Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
see lower loss percentage with the lower ingestion frequency, instead it is
the opposite!!

P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
percentage:

50 Hz: 0.273%
25 Hz: 0.284%
15 Hz: 0.302%

My suspect is that the data are lost because they arrive with a too high
lateness and they are dropped by Flink. Is it a possibility?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/