Connection refused error when writing to socket?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Connection refused error when writing to socket?

Li Peng
Hi there,

I'm trying to test a couple of things by having my stream write to a
socket, but it keeps failing to connect (I'm trying to have a stream
write to a socket, and have another stream read from that socket).

Caused by: java.net.ConnectException: Connection refused (Connection refused)

at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

I tried writeToSocket(parameterTool.get("localhost"),
parameterTool.getInt(9000), new SimpleStringSchema), and even a custom
sink:

addSink(w => {
  val ia = InetAddress.getByName("localhost")
  val socket = new Socket(ia, 9000)
  val outStream = socket.getOutputStream
  val out = new PrintWriter(new BufferedWriter(new
OutputStreamWriter(outStream)))
  out.println(w)
  out.flush()
  out.close()
})

But none of this seem to work. I'm fairly sure I setup the server
correctly since I can connect it to via a telnet and other dummy
echoclients I wrote. I can also have my data stream read from that
same socket without any issues, but I can't seem to tell my stream to
write to this socket without the above connection refused error
showing up. Is there some nuance here I'm missing?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Connection refused error when writing to socket?

Jonas Gröger
Can you try opening a socket with netcat on localhost?

nc -lk 9000

and see it this works? For me this works.

-- Jonas
Reply | Threaded
Open this post in threaded view
|

Re: Connection refused error when writing to socket?

Li Peng
Yes I did open a socket with netcat. Turns out my first error was due
to a stream without a sink triggering the socket connect and (I
thought that without a sink the stream wouldn't affect anything so I
didn't comment it out, and I didn't open the socket for that port).
However

I did play with it some more and I think the real issue is that I'm
trying to have two streams, one write to a port and another read from
the same port. i.e.

val y = executionEnvironment.socketTextStream("localhost", 9000)
x.writeToSocket("localhost", 9000, new SimpleStringSchema())

Once I tested just write or just the read it worked, but combined I
get this error:

java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:101)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

Is this operation not allowed?

And I'm mainly writing to the same socket in order to pass work back
and forth between streams.
Reply | Threaded
Open this post in threaded view
|

Re: Connection refused error when writing to socket?

Till Rohrmann
Hi Li Peng,

I think what you're trying to do won't work. The problem is that you have two TCP clients (sink and source) which are supposed to connect to each other. Without a server which buffers the incoming data and forwards it to the outgoing connections, it won't be possible to read the previously written data.

The exception you're observing originates from the fact that the source tries to connect to the TCP port 9000 which is not open (since there is no server listening on this port). The same would happen to the sink.

Cheers,
Till 

On Tue, Jan 31, 2017 at 9:04 PM, Li Peng <[hidden email]> wrote:
Yes I did open a socket with netcat. Turns out my first error was due
to a stream without a sink triggering the socket connect and (I
thought that without a sink the stream wouldn't affect anything so I
didn't comment it out, and I didn't open the socket for that port).
However

I did play with it some more and I think the real issue is that I'm
trying to have two streams, one write to a port and another read from
the same port. i.e.

val y = executionEnvironment.socketTextStream("localhost", 9000)
x.writeToSocket("localhost", 9000, new SimpleStringSchema())

Once I tested just write or just the read it worked, but combined I
get this error:

java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:101)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

Is this operation not allowed?

And I'm mainly writing to the same socket in order to pass work back
and forth between streams.