 
	
					
		
	
					| Hello all, I have been trying to read the stock data as a stream and perform outlier detection upon it.  My problem is mainly due to the absence of 'withBroadcastSet()' in DataStream API I used global variable and DataStreamUtils to read the variable loop. But I get cast exceptions and others. Could you please suggest if there is any other way to do, the operation as explained in code without using loop as a global variable. Below is snapshot of my code, and following the exception messages. private static ConnectedIterativeStreams<Centroid, Centroid> loop; ......... main(){ DataStream<Point> points = newDataStream.map(new getPoints()); DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter()); loop = centroids.iterate(10).withFeedbackType(Centroid.class); DataStream<Tuple2<String, Point>> newCentroids = points.map(new SelectNearestCenter()); } @ForwardedFields("*->1") 	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> { 		private Collection<Centroid> centroids; 		@Override 		public void open(Configuration parameters) throws Exception { 			Iterator<Centroid> iter = DataStreamUtils.collect(loop.getFirstInput()); 			this.centroids = Lists.newArrayList(iter); 		} 		@Override 		public Tuple2<String, Point> map(Point p) throws Exception { 			double minDistance = Double.MAX_VALUE; 			String closestCentroidId = "-1"; 			// check all cluster centers 			for (Centroid centroid : centroids) { 				// compute distance 				double distance = p.euclideanDistance(centroid); 				// update nearest cluster if necessary 				if (distance < minDistance) { 					minDistance = distance; 					closestCentroidId = centroid.id; 				} 			} 			// emit a new record with the center id and the data point. 			return new Tuple2<String, Point>(closestCentroidId, p); 		} 	} Could you please explain the following exception error. 14:23:56,269 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while processing timer. org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:322) 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293) 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323) 	at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:730) 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 	at java.util.concurrent.FutureTask.run(FutureTask.java:266) 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 	at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370) 	at org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44) 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:319) 	... 10 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord 	at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:1) 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) 	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) 	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95) 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90) 	... 13 more And as a result I get the below error message, 14:23:58,310 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task java.lang.NullPointerException 	at org.apache.flink.contrib.streaming.CollectSink.closeConnection(CollectSink.java:100) 	at org.apache.flink.contrib.streaming.CollectSink.close(CollectSink.java:131) 	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45) 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:347) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:294) 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) 	at java.lang.Thread.run(Thread.java:745) Sink: Unnamed (1/1) switched to FAILED with exception. java.lang.RuntimeException: java.net.ConnectException: Connection refused 	at org.apache.flink.contrib.streaming.CollectSink.initializeConnection(CollectSink.java:73) 	at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:123) 	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) 	at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused 	at java.net.PlainSocketImpl.socketConnect(Native Method) 	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 	at java.net.Socket.connect(Socket.java:589) 	at java.net.Socket.connect(Socket.java:538) 	at java.net.Socket.<init>(Socket.java:434) 	at java.net.Socket.<init>(Socket.java:244) 	at org.apache.flink.contrib.streaming.CollectSink.initializeConnection(CollectSink.java:69) Best Regards, Subash Basnet | 
 
	
					
		
	
					| Hi Subash, Unfortunately you can not reference a DataStream (loop) within a Flink operator. To handle both casual and feedback data I suggest using CoOperators. Have a look at this mockup I did some time ago for a conceptually similar problem. [1] | 
| Free forum by Nabble | Edit this page | 
 
	

 
	
	
		
