Hi there,
I'm trying to test a couple of things by having my stream write to a socket, but it keeps failing to connect (I'm trying to have a stream write to a socket, and have another stream read from that socket). Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745) I tried writeToSocket(parameterTool.get("localhost"), parameterTool.getInt(9000), new SimpleStringSchema), and even a custom sink: addSink(w => { val ia = InetAddress.getByName("localhost") val socket = new Socket(ia, 9000) val outStream = socket.getOutputStream val out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(outStream))) out.println(w) out.flush() out.close() }) But none of this seem to work. I'm fairly sure I setup the server correctly since I can connect it to via a telnet and other dummy echoclients I wrote. I can also have my data stream read from that same socket without any issues, but I can't seem to tell my stream to write to this socket without the above connection refused error showing up. Is there some nuance here I'm missing? Thanks! |
Can you try opening a socket with netcat on localhost?
nc -lk 9000 and see it this works? For me this works. -- Jonas |
Yes I did open a socket with netcat. Turns out my first error was due
to a stream without a sink triggering the socket connect and (I thought that without a sink the stream wouldn't affect anything so I didn't comment it out, and I didn't open the socket for that port). However I did play with it some more and I think the real issue is that I'm trying to have two streams, one write to a port and another read from the same port. i.e. val y = executionEnvironment.socketTextStream("localhost", 9000) x.writeToSocket("localhost", 9000, new SimpleStringSchema()) Once I tested just write or just the read it worked, but combined I get this error: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:210) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.read1(BufferedReader.java:210) at java.io.BufferedReader.read(BufferedReader.java:286) at java.io.Reader.read(Reader.java:140) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:101) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745) Is this operation not allowed? And I'm mainly writing to the same socket in order to pass work back and forth between streams. |
Hi Li Peng, I think what you're trying to do won't work. The problem is that you have two TCP clients (sink and source) which are supposed to connect to each other. Without a server which buffers the incoming data and forwards it to the outgoing connections, it won't be possible to read the previously written data. The exception you're observing originates from the fact that the source tries to connect to the TCP port 9000 which is not open (since there is no server listening on this port). The same would happen to the sink. Cheers, Till On Tue, Jan 31, 2017 at 9:04 PM, Li Peng <[hidden email]> wrote: Yes I did open a socket with netcat. Turns out my first error was due |
Free forum by Nabble | Edit this page |