Hello all,
I have been trying to read the stock data as a stream and perform outlier detection upon it. My problem is mainly due to the absence of 'withBroadcastSet()' in DataStream API I used global variable and DataStreamUtils to read the variable loop. But I get cast exceptions and others. Could you please suggest if there is any other way to do, the operation as explained in code without using loop as a global variable. Below is snapshot of my code, and following the exception messages. private static ConnectedIterativeStreams<Centroid, Centroid> loop; ......... main(){ DataStream<Point> points = newDataStream.map(new getPoints()); DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter()); loop = centroids.iterate(10).withFeedbackType(Centroid.class); DataStream<Tuple2<String, Point>> newCentroids = points.map(new SelectNearestCenter()); } @ForwardedFields("*->1") public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> { private Collection<Centroid> centroids; @Override public void open(Configuration parameters) throws Exception { Iterator<Centroid> iter = DataStreamUtils.collect(loop.getFirstInput()); this.centroids = Lists.newArrayList(iter); } @Override public Tuple2<String, Point> map(Point p) throws Exception { double minDistance = Double.MAX_VALUE; String closestCentroidId = "-1"; // check all cluster centers for (Centroid centroid : centroids) { // compute distance double distance = p.euclideanDistance(centroid); // update nearest cluster if necessary if (distance < minDistance) { minDistance = distance; closestCentroidId = centroid.id; } } // emit a new record with the center id and the data point. return new Tuple2<String, Point>(closestCentroidId, p); } } Could you please explain the following exception error. 14:23:56,269 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Caught exception while processing timer. org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:322) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:730) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370) at org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:319) ... 10 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:1) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90) ... 13 more And as a result I get the below error message, 14:23:58,310 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator - Exception while closing user function while failing or canceling task java.lang.NullPointerException at org.apache.flink.contrib.streaming.CollectSink.closeConnection(CollectSink.java:100) at org.apache.flink.contrib.streaming.CollectSink.close(CollectSink.java:131) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:347) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:294) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) at java.lang.Thread.run(Thread.java:745) Sink: Unnamed (1/1) switched to FAILED with exception. java.lang.RuntimeException: java.net.ConnectException: Connection refused at org.apache.flink.contrib.streaming.CollectSink.initializeConnection(CollectSink.java:73) at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:123) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.<init>(Socket.java:434) at java.net.Socket.<init>(Socket.java:244) at org.apache.flink.contrib.streaming.CollectSink.initializeConnection(CollectSink.java:69) Best Regards, Subash Basnet |
Hi Subash, Unfortunately you can not reference a DataStream (loop) within a Flink operator. To handle both casual and feedback data I suggest using CoOperators. Have a look at this mockup I did some time ago for a conceptually similar problem. [1] |
Free forum by Nabble | Edit this page |