Re: Connection refused error when writing to socket?

Posted by Till Rohrmann on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Connection-refused-error-when-writing-to-socket-tp11372p11408.html

Hi Li Peng,

I think what you're trying to do won't work. The problem is that you have two TCP clients (sink and source) which are supposed to connect to each other. Without a server which buffers the incoming data and forwards it to the outgoing connections, it won't be possible to read the previously written data.

The exception you're observing originates from the fact that the source tries to connect to the TCP port 9000 which is not open (since there is no server listening on this port). The same would happen to the sink.

Cheers,
TillĀ 

On Tue, Jan 31, 2017 at 9:04 PM, Li Peng <[hidden email]> wrote:
Yes I did open a socket with netcat. Turns out my first error was due
to a stream without a sink triggering the socket connect and (I
thought that without a sink the stream wouldn't affect anything so I
didn't comment it out, and I didn't open the socket for that port).
However

I did play with it some more and I think the real issue is that I'm
trying to have two streams, one write to a port and another read from
the same port. i.e.

val y = executionEnvironment.socketTextStream("localhost", 9000)
x.writeToSocket("localhost", 9000, new SimpleStringSchema())

Once I tested just write or just the read it worked, but combined I
get this error:

java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:101)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

Is this operation not allowed?

And I'm mainly writing to the same socket in order to pass work back
and forth between streams.