Hi.
my stream data is from some files. ( files -> kafka -> flink(source -> keyBy -> windowing) ) data is arranged in a file. I wanna assingTimestamp after keyBy. How can I do that. Regards. |
Please refer https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
for assigning timestamps. You can do map after keyby to assign timestamps e.g: val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) withTimestampsAndWatermarks .keyBy( _.getGroup ) .timeWindow(Time.seconds(10)) .reduce( (a, b) => a.add(b) ) .addSink(... ~Pushpendra |
Thanks for replying. pushpendra.
The assignTimestamp method returns not KeyedStream but DataStream. so I cannot use windowing. is it possible casting to KeyedStream? Regards > On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <[hidden email]> wrote: > > Please refer > https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html > for assigning timestamps. > > You can do map after keyby to assign timestamps > > e.g: > > val withTimestampsAndWatermarks: DataStream[MyEvent] = stream > .filter( _.severity == WARNING ) > .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) > > withTimestampsAndWatermarks > .keyBy( _.getGroup ) > .timeWindow(Time.seconds(10)) > .reduce( (a, b) => a.add(b) ) > .addSink(... > > ~Pushpendra > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
I wanna assign timestamp after keyBy.
because the stream does not aligned before keyBy. I’ve already tested as like your code. It occured many warnings that timestamp monotony violated. > On Sep 8, 2016, at 4:32 PM, Dong-iL, Kim <[hidden email]> wrote: > > Thanks for replying. pushpendra. > The assignTimestamp method returns not KeyedStream but DataStream. > so I cannot use windowing. > is it possible casting to KeyedStream? > Regards > >> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <[hidden email]> wrote: >> >> Please refer >> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html >> for assigning timestamps. >> >> You can do map after keyby to assign timestamps >> >> e.g: >> >> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream >> .filter( _.severity == WARNING ) >> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) >> >> withTimestampsAndWatermarks >> .keyBy( _.getGroup ) >> .timeWindow(Time.seconds(10)) >> .reduce( (a, b) => a.add(b) ) >> .addSink(... >> >> ~Pushpendra >> >> >> >> -- >> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html >> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
In reply to this post by Dong-iL, Kim
I would assign timestamps directly at the source. Reassigning timestamps somewhere in the middle of a job can cause very unexpected results.Timestamps are not striped of by operators. 2016-09-08 9:32 GMT+02:00 Dong-iL, Kim <[hidden email]>: Thanks for replying. pushpendra. |
Free forum by Nabble | Edit this page |