java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Kostya Kulagin
Hi guys,

trying to run this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 33).forEach(l -> {
ctx.collect(Tuple2.of(0L, "This is " + l));
});
}

@Override
public void cancel() {
}
});


source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
// source.
keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))).

apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception {
System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
}
});

env.execute("yoyoyo");

Getting Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)


- After googling I've found this: https://issues.apache.org/jira/browse/FLINK-3688
- went to github, downloaded branch 1.0.2 which contains specified change but having the same results.

What am I missing here?

Thanks!
Konstantin

Reply | Threaded
Open this post in threaded view
|

Re: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Fabian Hueske-2
Hi Konstantin,

this exception is thrown if you do not set the time characteristic to event time and assign timestamps.
Please try to add

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

after you obtained the StreamExecutionEnvironment.

Best, Fabian

2016-04-22 15:47 GMT+02:00 Konstantin Kulagin <[hidden email]>:
Hi guys,

trying to run this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 33).forEach(l -> {
ctx.collect(Tuple2.of(0L, "This is " + l));
});
}

@Override
public void cancel() {
}
});


source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
// source.
keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))).

apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception {
System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
}
});

env.execute("yoyoyo");

Getting Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)


- After googling I've found this: https://issues.apache.org/jira/browse/FLINK-3688
- went to github, downloaded branch 1.0.2 which contains specified change but having the same results.

What am I missing here?

Thanks!
Konstantin


Reply | Threaded
Open this post in threaded view
|

Re: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

rmetzger0
I've filed a JIRA to improve the error message: https://issues.apache.org/jira/browse/FLINK-3918

On Fri, Apr 22, 2016 at 11:17 PM, Fabian Hueske <[hidden email]> wrote:
Hi Konstantin,

this exception is thrown if you do not set the time characteristic to event time and assign timestamps.
Please try to add

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

after you obtained the StreamExecutionEnvironment.

Best, Fabian

2016-04-22 15:47 GMT+02:00 Konstantin Kulagin <[hidden email]>:
Hi guys,

trying to run this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 33).forEach(l -> {
ctx.collect(Tuple2.of(0L, "This is " + l));
});
}

@Override
public void cancel() {
}
});


source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
// source.
keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))).

apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception {
System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
}
});

env.execute("yoyoyo");

Getting Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)


- After googling I've found this: https://issues.apache.org/jira/browse/FLINK-3688
- went to github, downloaded branch 1.0.2 which contains specified change but having the same results.

What am I missing here?

Thanks!
Konstantin