Handle event time

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Handle event time

AndreaKinn
Hi,
I'm getting sensor data from a kafka source and I absolutely need they are
ordered on time data generation basis. I've implemented a custom
deserialiser and employed an AscendingTimestampExtractor to handle event
time.
Obviously I set EventTime as streamTimeCharacteristics.
Unfortunately when I print the stream I see there are many records
unordered. Am I doing something wrong?
I've attached a prove of that:

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.enableCheckpointing(CHECKPOINT_TIME);
                env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));

                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", KAFKA_ADDRESS);
                properties.setProperty("group.id", GROUP_ID);

                DataStream<Tuple6&lt;String, String, Date, String, String, Double>> stream
= env
                                .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
                                .assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple6&lt;String, String, Date, String, String,
Double>>() {

                                @Override
                                public long extractAscendingTimestamp(Tuple6<String, String,
Date, String, String, Double> element) {
                                    return element.f2.getTime();
                                }
                                })
                                .keyBy(0);
               
stream.print()*

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png>



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Handle event time

Xingcan Cui
Hi AndreaKinn,

The AscendingTimestampExtractor do not work as you think. It should be applied for streams where timestamps are
monotonously ascending, naturally. 

Flink uses watermark to deal with unordered data. When a watermark t is received, it means there should be no more
records whose timestamps are less than or equal to t. However, you must implement your own watermark generation
policy. There are two basic watermark assigners: AssignerWithPeriodicWatermarks for generating watermarks periodically
and  AssignerWithPunctuatedWatermarks for generating watermarks when encountered certain records.

For more information, please refer to [1] and [2].

Best,
Xingcan


On Fri, Sep 8, 2017 at 4:24 AM, AndreaKinn <[hidden email]> wrote:
Hi,
I'm getting sensor data from a kafka source and I absolutely need they are
ordered on time data generation basis. I've implemented a custom
deserialiser and employed an AscendingTimestampExtractor to handle event
time.
Obviously I set EventTime as streamTimeCharacteristics.
Unfortunately when I print the stream I see there are many records
unordered. Am I doing something wrong?
I've attached a prove of that:

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.enableCheckpointing(CHECKPOINT_TIME);
                env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));

                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", KAFKA_ADDRESS);
                properties.setProperty("group.id", GROUP_ID);

                DataStream<Tuple6&lt;String, String, Date, String, String, Double>> stream
= env
                                .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
                                .assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple6&lt;String, String, Date, String, String,
Double>>() {

                                @Override
                                public long extractAscendingTimestamp(Tuple6<String, String,
Date, String, String, Double> element) {
                                    return element.f2.getTime();
                                }
                                })
                                .keyBy(0);

stream.print()*

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png>



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Handle event time

AndreaKinn
Thank you, effectively I developed also a simple custom solution for
watermark looking at flink doc but anyway I see unordered printed streams.
I have a doubt about flink behaviour: if I understand, flink doesn't perform
automatically reordering of records in a stream, so if for instance a record
arrives in late what is the behaviour of flink? In the doc it's described
that elements arrive after in late are dropped (allowed lateness default
value is 0) but also using this watermark emitter:

*public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple6&lt;String, String, Date, String,
String, Double>>{

        private static final long serialVersionUID = 5448621759931440489L;
        private final long maxOutOfOrderness = 0;
    private long currentMaxTimestamp;
   
        @Override
        public long extractTimestamp(Tuple6<String, String, Date, String, String,
Double> element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
}*

with maxOutOfOrderness = 0 I see unordered record in the stream.

What I want to obtain is a fully ordered stream, is there a way to implement
it?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Handle event time

Eron Wright
As mentioned earlier, the watermark is the basis for reasoning about the overall progression of time.   Many operators use the watermark to correctly organize records, e.g. into the correct time-based window.   Within that window the records may still be unordered.   That said, some operators do take pains to reorder the records, notably the Flink CEP operator to correctly detect temporal patterns.  Basically, the operator buffers records until a watermark arrives; all buffered records older than the watermark may then be sorted and processed.

It is tempting to write a standalone operator that simply reorders records as described, but subsequent repartitioning to downstream operators would reintroduce disorder.  Therefore one must ensure that subsequent processing is done with a 'forward' partitioning strategy.

Hope this helps!
Eron

On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn <[hidden email]> wrote:
Thank you, effectively I developed also a simple custom solution for
watermark looking at flink doc but anyway I see unordered printed streams.
I have a doubt about flink behaviour: if I understand, flink doesn't perform
automatically reordering of records in a stream, so if for instance a record
arrives in late what is the behaviour of flink? In the doc it's described
that elements arrive after in late are dropped (allowed lateness default
value is 0) but also using this watermark emitter:

*public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple6&lt;String, String, Date, String,
String, Double>>{

        private static final long serialVersionUID = 5448621759931440489L;
        private final long maxOutOfOrderness = 0;
    private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(Tuple6<String, String, Date, String, String,
Double> element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
}*

with maxOutOfOrderness = 0 I see unordered record in the stream.

What I want to obtain is a fully ordered stream, is there a way to implement
it?

Reply | Threaded
Open this post in threaded view
|

Re: Handle event time

Xingcan Cui
Hi AndreaKinn,

Reordering in a stream environment is quite costly. AFAIK, Flink doesn't provide such functions internally.

Watermark is just one of the approaches to deal with the out-of-order problem. IMO, it just like a coarse-grained
reordering. The late records should be dropped manuallyMaybe you can try changing your function to be applied
on streams with such "coarse-grained" ordering. However, if the fully ordered stream is necessary in your
application, I'm afraid you must cache and re-emit them in a user-defined processFunction. 

Best,
Xingcan


On Tue, Sep 12, 2017 at 1:48 AM, Eron Wright <[hidden email]> wrote:
As mentioned earlier, the watermark is the basis for reasoning about the overall progression of time.   Many operators use the watermark to correctly organize records, e.g. into the correct time-based window.   Within that window the records may still be unordered.   That said, some operators do take pains to reorder the records, notably the Flink CEP operator to correctly detect temporal patterns.  Basically, the operator buffers records until a watermark arrives; all buffered records older than the watermark may then be sorted and processed.

It is tempting to write a standalone operator that simply reorders records as described, but subsequent repartitioning to downstream operators would reintroduce disorder.  Therefore one must ensure that subsequent processing is done with a 'forward' partitioning strategy.

Hope this helps!
Eron

On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn <[hidden email]> wrote:
Thank you, effectively I developed also a simple custom solution for
watermark looking at flink doc but anyway I see unordered printed streams.
I have a doubt about flink behaviour: if I understand, flink doesn't perform
automatically reordering of records in a stream, so if for instance a record
arrives in late what is the behaviour of flink? In the doc it's described
that elements arrive after in late are dropped (allowed lateness default
value is 0) but also using this watermark emitter:

*public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple6&lt;String, String, Date, String,
String, Double>>{

        private static final long serialVersionUID = 5448621759931440489L;
        private final long maxOutOfOrderness = 0;
    private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(Tuple6<String, String, Date, String, String,
Double> element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
}*

with maxOutOfOrderness = 0 I see unordered record in the stream.

What I want to obtain is a fully ordered stream, is there a way to implement
it?