Streaming Exception error message Explanation

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming Exception error message Explanation

subashbasnet
Hello all,

I have been trying to read the stock data as a stream and perform outlier detection upon it.
My problem is mainly due to the absence of 'withBroadcastSet()' in DataStream API I used global variable and DataStreamUtils to read the variable loop
But I get cast exceptions and others. Could you please suggest if there is any other way to do, the operation as explained in code without using loop  as a global variable. 
Below is snapshot of my code, and following the exception messages. 

private static ConnectedIterativeStreams<Centroid, Centroid> loop;
.........
main(){
DataStream<Point> points = newDataStream.map(new getPoints());
DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
loop = centroids.iterate(10).withFeedbackType(Centroid.class);
DataStream<Tuple2<String, Point>> newCentroids = points.map(new SelectNearestCenter());
}

@ForwardedFields("*->1")
public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;

@Override
public void open(Configuration parameters) throws Exception {
Iterator<Centroid> iter = DataStreamUtils.collect(loop.getFirstInput());
this.centroids = Lists.newArrayList(iter);
}

@Override
public Tuple2<String, Point> map(Point p) throws Exception {

double minDistance = Double.MAX_VALUE;
String closestCentroidId = "-1";

// check all cluster centers
for (Centroid centroid : centroids) {
// compute distance
double distance = p.euclideanDistance(centroid);

// update nearest cluster if necessary
if (distance < minDistance) {
minDistance = distance;
closestCentroidId = centroid.id;
}
}

// emit a new record with the center id and the data point.
return new Tuple2<String, Point>(closestCentroidId, p);
}
}


Could you please explain the following exception error. 

14:23:56,269 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while processing timer.
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:322)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:730)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
at org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:319)
... 10 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:1)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
... 13 more

And as a result I get the below error message,
14:23:58,310 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
java.lang.NullPointerException
at org.apache.flink.contrib.streaming.CollectSink.closeConnection(CollectSink.java:100)
at org.apache.flink.contrib.streaming.CollectSink.close(CollectSink.java:131)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:347)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:294)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.lang.Thread.run(Thread.java:745)

Sink: Unnamed (1/1) switched to FAILED with exception.
java.lang.RuntimeException: java.net.ConnectException: Connection refused
at org.apache.flink.contrib.streaming.CollectSink.initializeConnection(CollectSink.java:73)
at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:123)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:244)
at org.apache.flink.contrib.streaming.CollectSink.initializeConnection(CollectSink.java:69)


Best Regards,
Subash Basnet

Reply | Threaded
Open this post in threaded view
|

Re: Streaming Exception error message Explanation

Márton Balassi
Hi Subash,

Unfortunately you can not reference a DataStream (loop) within a Flink operator. To handle both casual and feedback data I suggest using CoOperators. Have a look at this mockup I did some time ago for a conceptually similar problem. [1]


On Wed, Jul 6, 2016 at 3:35 PM, subash basnet <[hidden email]> wrote:
loop