Hi,
I'm getting sensor data from a kafka source and I absolutely need they are ordered on time data generation basis. I've implemented a custom deserialiser and employed an AscendingTimestampExtractor to handle event time. Obviously I set EventTime as streamTimeCharacteristics. Unfortunately when I print the stream I see there are many records unordered. Am I doing something wrong? I've attached a prove of that: *env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(CHECKPOINT_TIME); env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", KAFKA_ADDRESS); properties.setProperty("group.id", GROUP_ID); DataStream<Tuple6<String, String, Date, String, String, Double>> stream = env .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(), properties)) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple6<String, String, Date, String, String, Double>>() { @Override public long extractAscendingTimestamp(Tuple6<String, String, Date, String, String, Double> element) { return element.f2.getTime(); } }) .keyBy(0); stream.print()* <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi AndreaKinn, The AscendingTimestampExtractor do not work as you think. It should be applied for streams where timestamps are monotonously ascending, naturally. Flink uses watermark to deal with unordered data. When a watermark t is received, it means there should be no more records whose timestamps are less than or equal to t. However, you must implement your own watermark generation policy. There are two basic watermark assigners: AssignerWithPeriodicWatermarks for generating watermarks periodically and AssignerWithPunctuatedWatermarks for generating watermarks when encountered certain records. For more information, please refer to [1] and [2]. Best, Xingcan On Fri, Sep 8, 2017 at 4:24 AM, AndreaKinn <[hidden email]> wrote: Hi, |
Thank you, effectively I developed also a simple custom solution for
watermark looking at flink doc but anyway I see unordered printed streams. I have a doubt about flink behaviour: if I understand, flink doesn't perform automatically reordering of records in a stream, so if for instance a record arrives in late what is the behaviour of flink? In the doc it's described that elements arrive after in late are dropped (allowed lateness default value is 0) but also using this watermark emitter: *public class CustomTimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple6<String, String, Date, String, String, Double>>{ private static final long serialVersionUID = 5448621759931440489L; private final long maxOutOfOrderness = 0; private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple6<String, String, Date, String, String, Double> element, long previousElementTimestamp) { long timestamp = element.f2.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } }* with maxOutOfOrderness = 0 I see unordered record in the stream. What I want to obtain is a fully ordered stream, is there a way to implement it? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
As mentioned earlier, the watermark is the basis for reasoning about the overall progression of time. Many operators use the watermark to correctly organize records, e.g. into the correct time-based window. Within that window the records may still be unordered. That said, some operators do take pains to reorder the records, notably the Flink CEP operator to correctly detect temporal patterns. Basically, the operator buffers records until a watermark arrives; all buffered records older than the watermark may then be sorted and processed. It is tempting to write a standalone operator that simply reorders records as described, but subsequent repartitioning to downstream operators would reintroduce disorder. Therefore one must ensure that subsequent processing is done with a 'forward' partitioning strategy. Hope this helps! Eron On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn <[hidden email]> wrote: Thank you, effectively I developed also a simple custom solution for |
Hi AndreaKinn, Reordering in a stream environment is quite costly. AFAIK, Flink doesn't provide such functions internally. Watermark is just one of the approaches to deal with the out-of-order problem. IMO, it just like a coarse-grained reordering. The late records should be dropped manually. Maybe you can try changing your function to be applied on streams with such "coarse-grained" ordering. However, if the fully ordered stream is necessary in your application, I'm afraid you must cache and re-emit them in a user-defined processFunction. Best, Xingcan On Tue, Sep 12, 2017 at 1:48 AM, Eron Wright <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |