how flink extracts timestamp from transformed elements?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

how flink extracts timestamp from transformed elements?

Jinhua Luo
Hi All,

The timestamp assigner is for one type, normally for the type from the
source, but after several operators, the element type would change and
the elements would be aggregated, if I do timeWindow again, how flink
extracts timestamp from elements? For example, the fold operators
aggregate 10 source elements into one, it would copy the last
element's timestamp to the result element?
Reply | Threaded
Open this post in threaded view
|

Re: how flink extracts timestamp from transformed elements?

Fabian Hueske-2
Hi,

timestamps are handled as meta-data in Flink's DataStream API.
This means that Flink automatically maintains the timestamps and ensures that all records which were aligned with the watermarks (i.e., not late) are still aligned.
If records are aggregated in a time window, the aggregation results has the maximum allowed timestamp of the window. For example a tumbling window of size 1 hour that starts at 14:00 emits its results with a timestamp of 14:59:59.999.

Best, Fabian

2017-12-16 9:01 GMT+01:00 Jinhua Luo <[hidden email]>:
Hi All,

The timestamp assigner is for one type, normally for the type from the
source, but after several operators, the element type would change and
the elements would be aggregated, if I do timeWindow again, how flink
extracts timestamp from elements? For example, the fold operators
aggregate 10 source elements into one, it would copy the last
element's timestamp to the result element?

Reply | Threaded
Open this post in threaded view
|

Re: how flink extracts timestamp from transformed elements?

Jinhua Luo
Thanks.

The keyBy() splits the stream into multiple logical streams, if I do
timeWindow(), then how flink merge all logical windows into one?
When does the window functions get invoked? at the same time? or
individually but flink wait for all window functions finished and
merge the results?


2017-12-18 18:02 GMT+08:00 Fabian Hueske <[hidden email]>:

> Hi,
>
> timestamps are handled as meta-data in Flink's DataStream API.
> This means that Flink automatically maintains the timestamps and ensures
> that all records which were aligned with the watermarks (i.e., not late) are
> still aligned.
> If records are aggregated in a time window, the aggregation results has the
> maximum allowed timestamp of the window. For example a tumbling window of
> size 1 hour that starts at 14:00 emits its results with a timestamp of
> 14:59:59.999.
>
> Best, Fabian
>
> 2017-12-16 9:01 GMT+01:00 Jinhua Luo <[hidden email]>:
>>
>> Hi All,
>>
>> The timestamp assigner is for one type, normally for the type from the
>> source, but after several operators, the element type would change and
>> the elements would be aggregated, if I do timeWindow again, how flink
>> extracts timestamp from elements? For example, the fold operators
>> aggregate 10 source elements into one, it would copy the last
>> element's timestamp to the result element?
>
>
Reply | Threaded
Open this post in threaded view
|

Re: how flink extracts timestamp from transformed elements?

Fabian Hueske-2
If you define a keyed window (use keyBy()), the results are not merged.
For each key, the window is individually evaluated and all results of windows for the same time have the same timestamp.

2017-12-18 11:30 GMT+01:00 Jinhua Luo <[hidden email]>:
Thanks.

The keyBy() splits the stream into multiple logical streams, if I do
timeWindow(), then how flink merge all logical windows into one?
When does the window functions get invoked? at the same time? or
individually but flink wait for all window functions finished and
merge the results?


2017-12-18 18:02 GMT+08:00 Fabian Hueske <[hidden email]>:
> Hi,
>
> timestamps are handled as meta-data in Flink's DataStream API.
> This means that Flink automatically maintains the timestamps and ensures
> that all records which were aligned with the watermarks (i.e., not late) are
> still aligned.
> If records are aggregated in a time window, the aggregation results has the
> maximum allowed timestamp of the window. For example a tumbling window of
> size 1 hour that starts at 14:00 emits its results with a timestamp of
> 14:59:59.999.
>
> Best, Fabian
>
> 2017-12-16 9:01 GMT+01:00 Jinhua Luo <[hidden email]>:
>>
>> Hi All,
>>
>> The timestamp assigner is for one type, normally for the type from the
>> source, but after several operators, the element type would change and
>> the elements would be aggregated, if I do timeWindow again, how flink
>> extracts timestamp from elements? For example, the fold operators
>> aggregate 10 source elements into one, it would copy the last
>> element's timestamp to the result element?
>
>

Reply | Threaded
Open this post in threaded view
|

Re: how flink extracts timestamp from transformed elements?

Jinhua Luo
Maybe the "merge" is not accurate.
In fact, I mean the downstream would receive results from all logical
window, right?
Then how flink align the results on the timestamp? i.e. if one of the
logical window emits the result to the downstream, then how downstream
treats the timestamp and watermark of the result?
It would wait for other logical windows emit results belonging to the
same window?

2017-12-18 18:51 GMT+08:00 Fabian Hueske <[hidden email]>:

> If you define a keyed window (use keyBy()), the results are not merged.
> For each key, the window is individually evaluated and all results of
> windows for the same time have the same timestamp.
>
> 2017-12-18 11:30 GMT+01:00 Jinhua Luo <[hidden email]>:
>>
>> Thanks.
>>
>> The keyBy() splits the stream into multiple logical streams, if I do
>> timeWindow(), then how flink merge all logical windows into one?
>> When does the window functions get invoked? at the same time? or
>> individually but flink wait for all window functions finished and
>> merge the results?
>>
>>
>> 2017-12-18 18:02 GMT+08:00 Fabian Hueske <[hidden email]>:
>> > Hi,
>> >
>> > timestamps are handled as meta-data in Flink's DataStream API.
>> > This means that Flink automatically maintains the timestamps and ensures
>> > that all records which were aligned with the watermarks (i.e., not late)
>> > are
>> > still aligned.
>> > If records are aggregated in a time window, the aggregation results has
>> > the
>> > maximum allowed timestamp of the window. For example a tumbling window
>> > of
>> > size 1 hour that starts at 14:00 emits its results with a timestamp of
>> > 14:59:59.999.
>> >
>> > Best, Fabian
>> >
>> > 2017-12-16 9:01 GMT+01:00 Jinhua Luo <[hidden email]>:
>> >>
>> >> Hi All,
>> >>
>> >> The timestamp assigner is for one type, normally for the type from the
>> >> source, but after several operators, the element type would change and
>> >> the elements would be aggregated, if I do timeWindow again, how flink
>> >> extracts timestamp from elements? For example, the fold operators
>> >> aggregate 10 source elements into one, it would copy the last
>> >> element's timestamp to the result element?
>> >
>> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: how flink extracts timestamp from transformed elements?

Fabian Hueske-2
Yes, that's ensured by the watermark mechanism [1].
An operator advances its watermark to the minimum of the last received watermark of each input channel.
So the event-time of an operator won't advanced past the end time of a window until the watermarks (and results) of all window subtasks have been received.

2017-12-18 12:09 GMT+01:00 Jinhua Luo <[hidden email]>:
Maybe the "merge" is not accurate.
In fact, I mean the downstream would receive results from all logical
window, right?
Then how flink align the results on the timestamp? i.e. if one of the
logical window emits the result to the downstream, then how downstream
treats the timestamp and watermark of the result?
It would wait for other logical windows emit results belonging to the
same window?

2017-12-18 18:51 GMT+08:00 Fabian Hueske <[hidden email]>:
> If you define a keyed window (use keyBy()), the results are not merged.
> For each key, the window is individually evaluated and all results of
> windows for the same time have the same timestamp.
>
> 2017-12-18 11:30 GMT+01:00 Jinhua Luo <[hidden email]>:
>>
>> Thanks.
>>
>> The keyBy() splits the stream into multiple logical streams, if I do
>> timeWindow(), then how flink merge all logical windows into one?
>> When does the window functions get invoked? at the same time? or
>> individually but flink wait for all window functions finished and
>> merge the results?
>>
>>
>> 2017-12-18 18:02 GMT+08:00 Fabian Hueske <[hidden email]>:
>> > Hi,
>> >
>> > timestamps are handled as meta-data in Flink's DataStream API.
>> > This means that Flink automatically maintains the timestamps and ensures
>> > that all records which were aligned with the watermarks (i.e., not late)
>> > are
>> > still aligned.
>> > If records are aggregated in a time window, the aggregation results has
>> > the
>> > maximum allowed timestamp of the window. For example a tumbling window
>> > of
>> > size 1 hour that starts at 14:00 emits its results with a timestamp of
>> > 14:59:59.999.
>> >
>> > Best, Fabian
>> >
>> > 2017-12-16 9:01 GMT+01:00 Jinhua Luo <[hidden email]>:
>> >>
>> >> Hi All,
>> >>
>> >> The timestamp assigner is for one type, normally for the type from the
>> >> source, but after several operators, the element type would change and
>> >> the elements would be aggregated, if I do timeWindow again, how flink
>> >> extracts timestamp from elements? For example, the fold operators
>> >> aggregate 10 source elements into one, it would copy the last
>> >> element's timestamp to the result element?
>> >
>> >
>
>