assignTimestamp after keyBy

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

assignTimestamp after keyBy

Dong-iL, Kim
Hi.
my stream data is from some files. ( files -> kafka -> flink(source -> keyBy -> windowing) )
data is arranged in a file.
I wanna assingTimestamp after keyBy.
How can I do that.
Regards.
Reply | Threaded
Open this post in threaded view
|

Re: assignTimestamp after keyBy

pushpendra.jaiswal
Please refer https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
for assigning timestamps.
 
You can do map after keyby to assign timestamps

e.g:

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...

~Pushpendra
Reply | Threaded
Open this post in threaded view
|

Re: assignTimestamp after keyBy

Dong-iL, Kim
Thanks for replying. pushpendra.
The assignTimestamp method returns not KeyedStream but DataStream.
so I cannot use windowing.
is it possible casting to KeyedStream?
Regards

> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <[hidden email]> wrote:
>
> Please refer
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
> for assigning timestamps.
>
> You can do map after keyby to assign timestamps
>
> e.g:
>
> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
>        .filter( _.severity == WARNING )
>        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>
> withTimestampsAndWatermarks
>        .keyBy( _.getGroup )
>        .timeWindow(Time.seconds(10))
>        .reduce( (a, b) => a.add(b) )
>        .addSink(...
>
> ~Pushpendra
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: assignTimestamp after keyBy

Dong-iL, Kim
I wanna assign timestamp after keyBy.
because the stream does not aligned before keyBy.
I’ve already tested as like your code.
It occured many warnings that timestamp monotony violated.

> On Sep 8, 2016, at 4:32 PM, Dong-iL, Kim <[hidden email]> wrote:
>
> Thanks for replying. pushpendra.
> The assignTimestamp method returns not KeyedStream but DataStream.
> so I cannot use windowing.
> is it possible casting to KeyedStream?
> Regards
>
>> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <[hidden email]> wrote:
>>
>> Please refer
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
>> for assigning timestamps.
>>
>> You can do map after keyby to assign timestamps
>>
>> e.g:
>>
>> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
>>       .filter( _.severity == WARNING )
>>       .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>>
>> withTimestampsAndWatermarks
>>       .keyBy( _.getGroup )
>>       .timeWindow(Time.seconds(10))
>>       .reduce( (a, b) => a.add(b) )
>>       .addSink(...
>>
>> ~Pushpendra
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: assignTimestamp after keyBy

Fabian Hueske-2
In reply to this post by Dong-iL, Kim
I would assign timestamps directly at the source.
Timestamps are not striped of by operators.

Reassigning timestamps somewhere in the middle of a job can cause very unexpected results.

2016-09-08 9:32 GMT+02:00 Dong-iL, Kim <[hidden email]>:
Thanks for replying. pushpendra.
The assignTimestamp method returns not KeyedStream but DataStream.
so I cannot use windowing.
is it possible casting to KeyedStream?
Regards

> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <[hidden email]> wrote:
>
> Please refer
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
> for assigning timestamps.
>
> You can do map after keyby to assign timestamps
>
> e.g:
>
> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
>        .filter( _.severity == WARNING )
>        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>
> withTimestampsAndWatermarks
>        .keyBy( _.getGroup )
>        .timeWindow(Time.seconds(10))
>        .reduce( (a, b) => a.add(b) )
>        .addSink(...
>
> ~Pushpendra
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.