Triggering events

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Triggering events

Niels Basjes
Hi,

I'm experimenting with a custom Windowing setup over clickstream data.
I want the timestamps of this clickstream data to be the timestamps 'when the event occurred' and in the Windows I need to trigger on these times.

For testing I created a source roughly like this:
    public class ManualTimeEventSource extends RichEventTimeSourceFunction<Long> {
                    ctx.collectWithTimestamp(event, event.timestamp);

But none of the triggers were called so I started digging through the code.
Then I figured I apparently needed to add the watermarks myself, so I added a line:
                    ctx.emitWatermark(new Watermark(event.timestamp));

But now I get:

Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)
... 9 more

This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in Flink or in my code?

What is the right way to trigger the events in my Windowing setup?



P.S. I'm binding my Java application against Flink version 0.10.1

--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Triggering events

Aljoscha Krettek
Hi,
the problem here is that the system needs to be aware that Watermarks will be flowing through the system. You can either do this via:

env.setStreamTimeCharacteristic(EventTime);

or:

env.getConfig().enableTimestamps();

I know, not very intuitive.

Cheers,
Aljoscha

> On 30 Nov 2015, at 14:47, Niels Basjes <[hidden email]> wrote:
>
> Hi,
>
> I'm experimenting with a custom Windowing setup over clickstream data.
> I want the timestamps of this clickstream data to be the timestamps 'when the event occurred' and in the Windows I need to trigger on these times.
>
> For testing I created a source roughly like this:
>     public class ManualTimeEventSource extends RichEventTimeSourceFunction<Long> {
>                     ctx.collectWithTimestamp(event, event.timestamp);
>
> But none of the triggers were called so I started digging through the code.
> Then I figured I apparently needed to add the watermarks myself, so I added a line:
>                     ctx.emitWatermark(new Watermark(event.timestamp));
>
> But now I get:
>
> Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)
> at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)
> ... 9 more
>
> This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in Flink or in my code?
>
> What is the right way to trigger the events in my Windowing setup?
>
>
>
> P.S. I'm binding my Java application against Flink version 0.10.1
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes

Reply | Threaded
Open this post in threaded view
|

Re: Triggering events

Niels Basjes
Thanks. 
That works great.

Niels

On Mon, Nov 30, 2015 at 3:32 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the problem here is that the system needs to be aware that Watermarks will be flowing through the system. You can either do this via:

env.setStreamTimeCharacteristic(EventTime);

or:

env.getConfig().enableTimestamps();

I know, not very intuitive.

Cheers,
Aljoscha

> On 30 Nov 2015, at 14:47, Niels Basjes <[hidden email]> wrote:
>
> Hi,
>
> I'm experimenting with a custom Windowing setup over clickstream data.
> I want the timestamps of this clickstream data to be the timestamps 'when the event occurred' and in the Windows I need to trigger on these times.
>
> For testing I created a source roughly like this:
>     public class ManualTimeEventSource extends RichEventTimeSourceFunction<Long> {
>                     ctx.collectWithTimestamp(event, event.timestamp);
>
> But none of the triggers were called so I started digging through the code.
> Then I figured I apparently needed to add the watermarks myself, so I added a line:
>                     ctx.emitWatermark(new Watermark(event.timestamp));
>
> But now I get:
>
> Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>       at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)
>       at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>       at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
>       at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
>       at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)
>       at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)
>       ... 9 more
>
> This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in Flink or in my code?
>
> What is the right way to trigger the events in my Windowing setup?
>
>
>
> P.S. I'm binding my Java application against Flink version 0.10.1
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes