Login  Register

How to assign timestamp for event time in a stream?

classic Classic list List threaded Threaded
2 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

How to assign timestamp for event time in a stream?

jiecxy
The program is to read the unordered records from a log file, and to print the record in order. But it doesn't change the  order, is there anything wrong in my code?  Can anyone give me an example?


This is my program:

Note: the class Tokenizer is to transfer the log to four parts. Like this:
   Sep  6 09:28:01 master systemd: Stopping user-988.slice.
to
  Tuple4<time, master, systemd,  Stopping user-988.slice.>

---------------------------------------------------------------------------------------------------------
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        DataStream<String> text = env.addSource(new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties));
        DataStream<Tuple4<Long, String, String, String>> messages =
                text.flatMap(new Tokenizer())
                        .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<Long, String, String, String>>() {
                            @Override
                            public long extractAscendingTimestamp(Tuple4<Long, String, String, String> tuple4) {
                                return tuple4.f0;
                            }
                        });
        messages.print().setParallelism(1);
---------------------------------------------------------------------------------------------------------

The inputs like this:
Sep  6 09:28:01 master systemd: Stopping user-988.slice1.
Sep  6 09:28:04 master systemd: Stopping user-988.slice4.
Sep  6 09:28:03 master systemd: Stopping user-988.slice3.
Sep  6 09:28:02 master systemd: Stopping user-988.slice2.

But the outputs are same with input, it doesn't change them to 1 - 2 - 3 - 4 (order by the time)...
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: How to assign timestamp for event time in a stream?

Till Rohrmann
Hi,

using event time and assigning timestamps does not order the stream records. In order to do that you can define a window and sort the elements in each window using Java sorting, for example. Alternatively, you can write your own operator which has a priority queue and always emits the elements up to the current watermark.

Cheers,
Till

On Wed, Sep 7, 2016 at 12:15 PM, jiecxy <[hidden email]> wrote:
The program is to read the unordered records from a log file, and to print
the record in order. But it doesn't change the  order, is there anything
wrong in my code?  Can anyone give me an example?


This is my program:

Note: the class Tokenizer is to transfer the log to four parts. Like this:
   Sep  6 09:28:01 master systemd: Stopping user-988.slice.
to
  Tuple4<time, master, systemd,  Stopping user-988.slice.>

---------------------------------------------------------------------------------------------------------
        // set up the execution environment
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

        DataStream<String> text = env.addSource(new
FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties));
        DataStream<Tuple4&lt;Long, String, String, String>> messages =
                text.flatMap(new Tokenizer())
                        .assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple4&lt;Long, String, String, String>>() {
                            @Override
                            public long
extractAscendingTimestamp(Tuple4<Long, String, String, String> tuple4) {
                                return tuple4.f0;
                            }
                        });
        messages.print().setParallelism(1);
---------------------------------------------------------------------------------------------------------

The inputs like this:
Sep  6 09:28:01 master systemd: Stopping user-988.slice1.
Sep  6 09:28:04 master systemd: Stopping user-988.slice4.
Sep  6 09:28:03 master systemd: Stopping user-988.slice3.
Sep  6 09:28:02 master systemd: Stopping user-988.slice2.

But the outputs are same with input, it doesn't change them to 1 - 2 - 3 - 4
(order by the time)...



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-assign-timestamp-for-event-time-in-a-stream-tp8944.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.