Hi,
I'm experimenting with a custom Windowing setup over clickstream data. I want the timestamps of this clickstream data to be the timestamps 'when the event occurred' and in the Windows I need to trigger on these times. For testing I created a source roughly like this: public class ManualTimeEventSource extends RichEventTimeSourceFunction<Long> { ctx.collectWithTimestamp(event, event.timestamp); But none of the triggers were called so I started digging through the code. Then I figured I apparently needed to add the watermarks myself, so I added a line: ctx.emitWatermark(new Watermark(event.timestamp)); But now I get:
This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in Flink or in my code? What is the right way to trigger the events in my Windowing setup? P.S. I'm binding my Java application against Flink version 0.10.1 Best regards / Met vriendelijke groeten,
Niels Basjes |
Hi,
the problem here is that the system needs to be aware that Watermarks will be flowing through the system. You can either do this via: env.setStreamTimeCharacteristic(EventTime); or: env.getConfig().enableTimestamps(); I know, not very intuitive. Cheers, Aljoscha > On 30 Nov 2015, at 14:47, Niels Basjes <[hidden email]> wrote: > > Hi, > > I'm experimenting with a custom Windowing setup over clickstream data. > I want the timestamps of this clickstream data to be the timestamps 'when the event occurred' and in the Windows I need to trigger on these times. > > For testing I created a source roughly like this: > public class ManualTimeEventSource extends RichEventTimeSourceFunction<Long> { > ctx.collectWithTimestamp(event, event.timestamp); > > But none of the triggers were called so I started digging through the code. > Then I figured I apparently needed to add the watermarks myself, so I added a line: > ctx.emitWatermark(new Watermark(event.timestamp)); > > But now I get: > > Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41) > at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) > at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) > at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93) > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88) > ... 9 more > > This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in Flink or in my code? > > What is the right way to trigger the events in my Windowing setup? > > > > P.S. I'm binding my Java application against Flink version 0.10.1 > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
Thanks. That works great. Niels On Mon, Nov 30, 2015 at 3:32 PM, Aljoscha Krettek <[hidden email]> wrote: Hi, Best regards / Met vriendelijke groeten,
Niels Basjes |
Free forum by Nabble | Edit this page |