Hello,
I extracted timestamps using BoundedOutOfOrdernessTimestampExtractor from my sources, have a WindowFunction, and found that my timestamps has been lost. To do another Window operation, I need to extract timestamp again. I tried to find a document for that but haven't found one. Could you please help tell which type of operators would preserve records' timestamp? The same question for keyed stream. I have been using the same key throughout my flow, but with many tranformations (using different operators, including coProcessFunction, and converting my data between different classes), and I have been trying to use DataStreamUtils.reinterpretAsKeyedStream. Is it safe to assume that as long as I dont do transformation on key, I could use that reinterpretAsKeyedStream function? Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Most operators will preserve the input elements timestamp if it has. Window is a special case. The timestamp of elements emitted by window is the maxTimestamp of the Window which is triggered. Different Window will have different implementation.(GlobalWindow/TimeWindow/CustomizedWindow). Keyby just shuffle data. I think it could not affect the element's timestamp. Hope this could help. Best, Guowei Averell <[hidden email]> 于2019年4月30日周二 上午7:28写道: Hello, |
Hi, Actually all operators should preserve record timestamps if set the correct TimeCharacteritics to event time. A window operator will set the timestamp of all emitted records to the end-timestamp of the window. Not sure what happens if you use a processing time window in an event time application though... Can you show a concise example of your program and explain how to check the timestamps? In general it is not a good idea to assign timestamps and watermarks in the middle of a program because it is can be quite hard to reason about out-of-orderness after the data was shuffled and processed. You can use reinterpretAsKeyedStream() if an operator does not change the keys and if the parallelism of the source and target operators are the same. Best, Fabian Am Di., 30. Apr. 2019 um 08:59 Uhr schrieb Guowei Ma <[hidden email]>:
|
Hi Fabian, Guowei
Thanks for the help. My flow is as the attached photo. Where (1) and (2) are the main data streams from file sources, while (3) and (4) are the enrichment data, also from file sources. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-05-01_at_08.png> (5) is to merge-parse (1) and (2), which consists of: A tumbling window function, with early trigger (basing on the number of records in the window: FIRE when there have been at least one msg from each stream 1 & 2, not waiting for window end-time) A flat map function to parse the incoming msg A filter and a map (6) works as a data enricher, to enrich output of (5) with data from (3) and (4). As (4) is broadcasted, what My implementation for (6) is like: /stream5.union(stream3).keyBy(key2).connect(stream4).process(MyFunction6 extends KeyedBroadcastProcessFunction)/ In this KeyedBroadcastProcessFunction, one msg from (5) would trigger one output, while a msg from (3) or (4) doesn't send out any records, but update the States only. Regarding messages type: Outputs of (1) and (2) are of the same type EventType1. Output of (3) is of type EventType2_1 extends EventType2 Output of (5) is of type EventType2_2 extends EventType2 Input of (6) is of type EventType2 (from the unioned-keyed-stream), and of type Type3 (from the broadcast stream) Output of (6) is of the type EventType2_3, which is mapped from EvenType2_1 As seen on my screenshot, only (5) showed watermark, not (6) nor (7). I noticed that problem because my (7) didn't work as expected. And when I put an eventTimeExtractor between (6) and (7), then (7) worked. Typing all the way until now, I guess that I have known where my issue came from: I have not assign timestamp/watermark for (3) and (4) because I thought that they are just idle sources of enrichment data. /*Because of this, I have another question:*/ I read the text regarding Idling sources [1], but not sure how to implement that for my file sources. Could you please recommend a solution/good-practice here? I have one more question about the recommendation [2] to emit timestamp and watermark from within the source function. Is there any way to do that with the file sources? Thanks and best regards, Averell [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#idling-sources [2] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html#source-functions-with-timestamps-and-watermarks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Fabian, Guowei,
I have some updates: 1. I added timestamp&watermark extractor on all of my remaining sources (3 & 4), and the watermark does propagate to my final operator. 2. As I could not find a way to set my file sources as IDLE, I tried to tweak the class ContinuousFileReaderOperator to be always IDLE: / nextElement = format.nextRecord(nextElement); if (nextElement != null) { readerContext.collect(nextElement); if (this.format.getFilePaths()[0].getPath().contains("<myPath>")) readerContext.markAsTemporarilyIdle(); } else { / and the result I got was there's no watermark at all for that stream, and that IDLE status seemed not to be taken into account (my CEP operator didn't generate any output). So I do not understand what that IDLE StreamStatus is for. My temporary solution, for now, is to use MAX_WATERMARK for those idle sources. Not sure whether doing that is recommended? Thanks for your help. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, The watermark of a stream is always the low watermark of all its input streams. If one of the input streams does not have watermarks, Flink does not compute a watermark for the merged stream. If you do not need time-based operations on streams 3 and 4, setting the watermark to MAX_WATERMARK should be a good solution. Best, Fabian Am Mi., 1. Mai 2019 um 08:50 Uhr schrieb Averell <[hidden email]>: Hi Fabian, Guowei, |
Thank you Fabian.
I have one more question about timestamp: In the previous email, you asked how did I check the timestamp - I don't have an answer. Then I only checked the watermark, not the timestamp. I had the (wrong) assumption that watermarks advance along with timestamps. Today I played with that early trigger window, putting the output into a table, and found that the timestamp is set to the window's end-time, but the watermark seems not. (My window is [10:00-10:15), my incoming msgs both have time-stamp of 10:00, which trigger one early output with timestamp 10:14:59.999, but the watermark stays at 10:00) Thus, my question: what is the easiest way to check the timestamp of a message? Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, Yes, timestamps and watermarks do not (completely) move together. The watermark should always be lower than the timestamps of the currently processed records. Otherwise, the records might be processed as late records (depending on the logic). The easiest way to check the timestamp of a message is using a ProcessFunction. The Context of the processElement() method has a timestamp() method that returns the timestamp of the current record. Best, Fabian Am Fr., 3. Mai 2019 um 06:08 Uhr schrieb Averell <[hidden email]>: Thank you Fabian. |
Thank you Fabian.
One more question from me on this topic: as I send out early messages in my window function, the timestamp assigned by window function (to the end-time of the window) is not like my expectation. I want it to be the time of the (last) message that triggered the output. Is there any way to accomplish that? Currently, I have an assignTimestampsAndWatermarks after my window function, but, as you said, it is against the best practice. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
The window operator cannot configured to use the max timestamp of the events in the window as the timestamp of the output record. The reason is that such a behavior can produce late records. If you want to do that, you have to track the max timestamp and assign it yourself with a timestamp assigner. Best, Fabian Am Fr., 3. Mai 2019 um 09:54 Uhr schrieb Averell <[hidden email]>: Thank you Fabian. |
Thank you Fabian.
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |