How to assign timestamp for event time in a stream?
Posted by jiecxy on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-assign-timestamp-for-event-time-in-a-stream-tp8944.html
The program is to read the unordered records from a log file, and to print the record in order. But it doesn't change the order, is there anything wrong in my code? Can anyone give me an example?
This is my program:
Note: the class Tokenizer is to transfer the log to four parts. Like this:
Sep 6 09:28:01 master systemd: Stopping user-988.slice.
to
Tuple4<time, master, systemd, Stopping user-988.slice.>
---------------------------------------------------------------------------------------------------------
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<String> text = env.addSource(new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties));
DataStream<Tuple4<Long, String, String, String>> messages =
text.flatMap(new Tokenizer())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<Long, String, String, String>>() {
@Override
public long extractAscendingTimestamp(Tuple4<Long, String, String, String> tuple4) {
return tuple4.f0;
}
});
messages.print().setParallelism(1);
---------------------------------------------------------------------------------------------------------
The inputs like this:
Sep 6 09:28:01 master systemd: Stopping user-988.slice1.
Sep 6 09:28:04 master systemd: Stopping user-988.slice4.
Sep 6 09:28:03 master systemd: Stopping user-988.slice3.
Sep 6 09:28:02 master systemd: Stopping user-988.slice2.
But the outputs are same with input, it doesn't change them to 1 - 2 - 3 - 4 (order by the time)...