Timestamp and key preservation over operators

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Timestamp and key preservation over operators

Averell
Hello,

I extracted timestamps using BoundedOutOfOrdernessTimestampExtractor from my
sources, have a WindowFunction, and found that my timestamps has been lost.
To do another Window operation, I need to extract timestamp again. I tried
to find a document for that but haven't found one.
Could you please help tell which type of operators would preserve records'
timestamp?

The same question for keyed stream. I have been using the same key
throughout my flow, but with many tranformations (using different operators,
including coProcessFunction, and converting my data between different
classes), and I have been trying to use
DataStreamUtils.reinterpretAsKeyedStream. Is it safe to assume that as long
as I dont do transformation on key, I could use that
reinterpretAsKeyedStream function?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Guowei Ma
Hi, 
Most operators will preserve the input elements timestamp if it has.
Window is a special case. The timestamp of elements emitted by window is the maxTimestamp of the Window which is triggered. Different Window will have different implementation.(GlobalWindow/TimeWindow/CustomizedWindow). 
Keyby just shuffle data. I think it could not affect the element's timestamp.

Hope this could help.

Best,
Guowei


Averell <[hidden email]> 于2019年4月30日周二 上午7:28写道:
Hello,

I extracted timestamps using BoundedOutOfOrdernessTimestampExtractor from my
sources, have a WindowFunction, and found that my timestamps has been lost.
To do another Window operation, I need to extract timestamp again. I tried
to find a document for that but haven't found one.
Could you please help tell which type of operators would preserve records'
timestamp?

The same question for keyed stream. I have been using the same key
throughout my flow, but with many tranformations (using different operators,
including coProcessFunction, and converting my data between different
classes), and I have been trying to use
DataStreamUtils.reinterpretAsKeyedStream. Is it safe to assume that as long
as I dont do transformation on key, I could use that
reinterpretAsKeyedStream function?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Fabian Hueske-2
Hi,

Actually all operators should preserve record timestamps if set the correct TimeCharacteritics to event time.
A window operator will set the timestamp of all emitted records to the end-timestamp of the window.
Not sure what happens if you use a processing time window in an event time application though...
Can you show a concise example of your program and explain how to check the timestamps?

In general it is not a good idea to assign timestamps and watermarks in the middle of a program because it is can be quite hard to reason about out-of-orderness after the data was shuffled and processed.

You can use reinterpretAsKeyedStream() if an operator does not change the keys and if the parallelism of the source and target operators are the same.

Best,
Fabian

Am Di., 30. Apr. 2019 um 08:59 Uhr schrieb Guowei Ma <[hidden email]>:
Hi, 
Most operators will preserve the input elements timestamp if it has.
Window is a special case. The timestamp of elements emitted by window is the maxTimestamp of the Window which is triggered. Different Window will have different implementation.(GlobalWindow/TimeWindow/CustomizedWindow). 
Keyby just shuffle data. I think it could not affect the element's timestamp.

Hope this could help.

Best,
Guowei


Averell <[hidden email]> 于2019年4月30日周二 上午7:28写道:
Hello,

I extracted timestamps using BoundedOutOfOrdernessTimestampExtractor from my
sources, have a WindowFunction, and found that my timestamps has been lost.
To do another Window operation, I need to extract timestamp again. I tried
to find a document for that but haven't found one.
Could you please help tell which type of operators would preserve records'
timestamp?

The same question for keyed stream. I have been using the same key
throughout my flow, but with many tranformations (using different operators,
including coProcessFunction, and converting my data between different
classes), and I have been trying to use
DataStreamUtils.reinterpretAsKeyedStream. Is it safe to assume that as long
as I dont do transformation on key, I could use that
reinterpretAsKeyedStream function?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Averell
Hi Fabian, Guowei

Thanks for the help. My flow is as the attached photo. Where (1) and (2) are
the main data streams from file sources, while (3) and (4) are the
enrichment data, also from file sources.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-05-01_at_08.png>

(5) is to merge-parse (1) and (2), which consists of:
        A tumbling window function, with early trigger (basing on the number of
records in the window: FIRE when there have been at least one msg from each
stream 1 & 2, not waiting for window end-time)
        A flat map function to parse the incoming msg
        A filter and a map

(6) works as a data enricher, to enrich output of (5) with data from (3) and
(4). As (4) is broadcasted, what My implementation for (6) is like:
        /stream5.union(stream3).keyBy(key2).connect(stream4).process(MyFunction6
extends KeyedBroadcastProcessFunction)/
In this KeyedBroadcastProcessFunction, one msg from (5) would trigger one
output, while a msg from (3) or (4) doesn't send out any records, but update
the States only.

Regarding messages type:
        Outputs of (1) and (2) are of the same type EventType1.
        Output of (3) is of type EventType2_1 extends EventType2
        Output of (5) is of type EventType2_2 extends EventType2
        Input of (6) is of type EventType2 (from the unioned-keyed-stream), and of
type Type3 (from the broadcast stream)
        Output of (6) is of the type EventType2_3, which is mapped from EvenType2_1

As seen on my screenshot, only (5) showed watermark, not (6) nor (7). I
noticed that problem because my (7) didn't work as expected. And when I put
an eventTimeExtractor between (6) and (7), then (7) worked.

Typing all the way until now, I guess that I have known where my issue came
from: I have not assign timestamp/watermark for (3) and (4) because I
thought that they are just idle sources of enrichment data.

/*Because of this, I have another question:*/
I read the text regarding Idling sources [1], but not sure how to implement
that for my file sources. Could you please recommend a
solution/good-practice here?

I have one more question about the recommendation [2] to emit timestamp and
watermark from within the source function. Is there any way to do that with
the file sources?

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#idling-sources
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html#source-functions-with-timestamps-and-watermarks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Averell
Hi Fabian, Guowei,

I have some updates:
1. I added timestamp&watermark extractor on all of my remaining sources (3 &
4), and the watermark does propagate to my final operator.
2. As I could not find a way to set my file sources as IDLE, I tried to
tweak the class ContinuousFileReaderOperator to be always IDLE:
/ nextElement = format.nextRecord(nextElement);
        if (nextElement != null) {
                readerContext.collect(nextElement);
                if (this.format.getFilePaths()[0].getPath().contains("<myPath>"))
                        readerContext.markAsTemporarilyIdle();
        } else {
/ and the result I got was there's no watermark at all for that stream, and
that IDLE status seemed not to be taken into account (my CEP operator didn't
generate any output). So I do not understand what that IDLE StreamStatus is
for.
My temporary solution, for now, is to use MAX_WATERMARK for those idle
sources. Not sure whether doing that is recommended?

Thanks for your help.
Regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Fabian Hueske-2
Hi Averell,

The watermark of a stream is always the low watermark of all its input streams. If one of the input streams does not have watermarks, Flink does not compute a watermark for the merged stream.
If you do not need time-based operations on streams 3 and 4, setting the watermark to MAX_WATERMARK should be a good solution.

Best, Fabian

Am Mi., 1. Mai 2019 um 08:50 Uhr schrieb Averell <[hidden email]>:
Hi Fabian, Guowei,

I have some updates:
1. I added timestamp&watermark extractor on all of my remaining sources (3 &
4), and the watermark does propagate to my final operator.
2. As I could not find a way to set my file sources as IDLE, I tried to
tweak the class ContinuousFileReaderOperator to be always IDLE:
/       nextElement = format.nextRecord(nextElement);
        if (nextElement != null) {
                readerContext.collect(nextElement);
                if (this.format.getFilePaths()[0].getPath().contains("<myPath>"))
                        readerContext.markAsTemporarilyIdle();
        } else {
/ and the result I got was there's no watermark at all for that stream, and
that IDLE status seemed not to be taken into account (my CEP operator didn't
generate any output). So I do not understand what that IDLE StreamStatus is
for.
My temporary solution, for now, is to use MAX_WATERMARK for those idle
sources. Not sure whether doing that is recommended?

Thanks for your help.
Regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Averell
Thank you Fabian.

I have one more question about timestamp:
In the previous email, you asked how did I check the timestamp - I don't
have an answer. Then I only checked the watermark, not the timestamp. I had
the (wrong) assumption that watermarks advance along with timestamps.
Today I played with that early trigger window, putting the output into a
table, and found that the timestamp is set to the window's end-time, but the
watermark seems not. (My window is [10:00-10:15), my incoming msgs both have
time-stamp of 10:00, which trigger one early output with timestamp
10:14:59.999, but the watermark stays at 10:00)

Thus, my question: what is the easiest way to check the timestamp of a
message?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Fabian Hueske-2
Hi Averell,

Yes, timestamps and watermarks do not (completely) move together.
The watermark should always be lower than the timestamps of the currently processed records.
Otherwise, the records might be processed as late records (depending on the logic).

The easiest way to check the timestamp of a message is using a ProcessFunction.
The Context of the processElement() method has a timestamp() method that returns the timestamp of the current record.

Best, Fabian

Am Fr., 3. Mai 2019 um 06:08 Uhr schrieb Averell <[hidden email]>:
Thank you Fabian.

I have one more question about timestamp:
In the previous email, you asked how did I check the timestamp - I don't
have an answer. Then I only checked the watermark, not the timestamp. I had
the (wrong) assumption that watermarks advance along with timestamps.
Today I played with that early trigger window, putting the output into a
table, and found that the timestamp is set to the window's end-time, but the
watermark seems not. (My window is [10:00-10:15), my incoming msgs both have
time-stamp of 10:00, which trigger one early output with timestamp
10:14:59.999, but the watermark stays at 10:00)

Thus, my question: what is the easiest way to check the timestamp of a
message?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Averell
Thank you Fabian.

One more question from me on this topic: as I send out early messages in my
window function, the timestamp assigned by window function (to the end-time
of the window) is not like my expectation. I want it to be the time of the
(last) message that triggered the output.

Is there any way to accomplish that?
Currently, I have an assignTimestampsAndWatermarks after my window function,
but, as you said, it is against the best practice.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Fabian Hueske-2
The window operator cannot configured to use the max timestamp of the events in the window as the timestamp of the output record.
The reason is that such a behavior can produce late records.

If you want to do that, you have to track the max timestamp and assign it yourself with a timestamp assigner.

Best, Fabian

Am Fr., 3. Mai 2019 um 09:54 Uhr schrieb Averell <[hidden email]>:
Thank you Fabian.

One more question from me on this topic: as I send out early messages in my
window function, the timestamp assigned by window function (to the end-time
of the window) is not like my expectation. I want it to be the time of the
(last) message that triggered the output.

Is there any way to accomplish that?
Currently, I have an assignTimestampsAndWatermarks after my window function,
but, as you said, it is against the best practice.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp and key preservation over operators

Averell