Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
The program is to read the unordered records from a log file, and to print the record in order. But it doesn't change the order, is there anything wrong in my code? Can anyone give me an example?
This is my program: Note: the class Tokenizer is to transfer the log to four parts. Like this: Sep 6 09:28:01 master systemd: Stopping user-988.slice. to Tuple4<time, master, systemd, Stopping user-988.slice.> --------------------------------------------------------------------------------------------------------- // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<String> text = env.addSource(new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties)); DataStream<Tuple4<Long, String, String, String>> messages = text.flatMap(new Tokenizer()) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<Long, String, String, String>>() { @Override public long extractAscendingTimestamp(Tuple4<Long, String, String, String> tuple4) { return tuple4.f0; } }); messages.print().setParallelism(1); --------------------------------------------------------------------------------------------------------- The inputs like this: Sep 6 09:28:01 master systemd: Stopping user-988.slice1. Sep 6 09:28:04 master systemd: Stopping user-988.slice4. Sep 6 09:28:03 master systemd: Stopping user-988.slice3. Sep 6 09:28:02 master systemd: Stopping user-988.slice2. But the outputs are same with input, it doesn't change them to 1 - 2 - 3 - 4 (order by the time)... |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi, using event time and assigning timestamps does not order the stream records. In order to do that you can define a window and sort the elements in each window using Java sorting, for example. Alternatively, you can write your own operator which has a priority queue and always emits the elements up to the current watermark. Cheers, Till On Wed, Sep 7, 2016 at 12:15 PM, jiecxy <[hidden email]> wrote: The program is to read the unordered records from a log file, and to print ... [show rest of quote] |
Free forum by Nabble | Edit this page |