Yes I did open a socket with netcat. Turns out my first error was due
to a stream without a sink triggering the socket connect and (I
thought that without a sink the stream wouldn't affect anything so I
didn't comment it out, and I didn't open the socket for that port).
However
I did play with it some more and I think the real issue is that I'm
trying to have two streams, one write to a port and another read from
the same port. i.e.
val y = executionEnvironment.socketTextStream("localhost", 9000)
x.writeToSocket("localhost", 9000, new SimpleStringSchema())
Once I tested just write or just the read it worked, but combined I
get this error:
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java: 210)
at java.net.SocketInputStream.read(SocketInputStream.java: 141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java: 284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java: 326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java: 184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at org.apache.flink.streaming.api.functions.source. SocketTextStreamFunction.run( SocketTextStreamFunction.java: 101)
at org.apache.flink.streaming.api.operators.StreamSource. Is this operation not allowed?run(StreamSource.java:80)
at org.apache.flink.streaming.api.operators.StreamSource. run(StreamSource.java:53)
at org.apache.flink.streaming.runtime.tasks. SourceStreamTask.run( SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask. invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task. java:642)
at java.lang.Thread.run(Thread.java:745)
And I'm mainly writing to the same socket in order to pass work back
and forth between streams.
Free forum by Nabble | Edit this page |