Hi, This is a simple example that I found using Flink Stream. I changed it so the flink client can be executed on a remote cluster, and so that it can open a socket server to ship its results for any other consumer machine. It seems to me that the socket server is not being open in the remote cluster, but rather in my local machine (which I'm using to launch the app). How can I achieve that? I want to be able to ship results directly from the remote cluster, and through a socket server where clients can use as a tap.Sorry about indentation: def main(args: Array[String]) {
val env = StreamExecutionEnvironment.createRemoteEnvironment("myhostname", DefaultFlinkMasterPort,
// WHERE IS THE FOLLOWING CODE RUN?
|
Your "SocketWriter-Thread" code will run on your client. All code in
"main" runs on the client. execute() itself runs on the client, too. Of course, it triggers the job submission to the cluster. In this step, the assembled job from the previous calls is translated into the JobGraph which is submitted to the JobManager for execution. You should start your SocketWriter-Thread manually on the cluster, ie, if you use "localhost" in "env.socketTextStream", it must be the TaskManager machine that executes this SocketStream-source task. I guess, it would be better not to use "localhost", but start your SocketWriter-Thread on a dedicated machine in the cluster, and connect your SocketStream-source to this machine via its host name. -Matthias On 01/19/2016 03:57 PM, Saiph Kappa wrote: > Hi, > > This is a simple example that I found using Flink Stream. I changed it > so the flink client can be executed on a remote cluster, and so that it > can open a socket server to ship its results for any other consumer > machine. It seems to me that the socket server is not being open in the > remote cluster, but rather in my local machine (which I'm using to > launch the app). How can I achieve that? I want to be able to ship > results directly from the remote cluster, and through a socket server > where clients can use as a tap. > > Sorry about indentation: > > |def main(args: Array[String]) { | > > val env = > StreamExecutionEnvironment.createRemoteEnvironment("myhostname", > DefaultFlinkMasterPort, > > ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at > map it to StockPrice objects val socketStockStream = > env.socketTextStream("localhost", 9999).map(x => { val split = > x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate other > stock streams val SPX_Stream = env.addSource(generateStock("SPX")(10) _) > val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val > DJI_Stream = env.addSource(generateStock("DJI")(30) _) val BUX_Stream = > env.addSource(generateStock("BUX")(40) _) //Merge all stock streams > together val stockStream = socketStockStream.merge(SPX_Stream, > FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print() > | > > // WHERE IS THE FOLLOWING CODE RUN? > > |var out: PrintWriter = null > new Thread { > override def run(): Unit = { > val serverSocket = new ServerSocket(12345) > while (true) { > val socket = serverSocket.accept() > val hostname = socket.getInetAddress.getHostName.split('.').head > println(s"Got a new connection from $hostname") > out = new PrintWriter(socket.getOutputStream) > } > } > }.start() > > |||stockStream|.addSink(record => { > if(out != null) { > out.write(record) > out.flush() > } > }) > > env.execute("Stock stream") }| > > Thanks. signature.asc (836 bytes) Download Attachment |
Thanks for your reply Mattias. So it is not possible to open a socket
server in the JobGraph and having it open during the lifetime of the
job, is that what you are saying? And it is required to have an external
process to open that socket server.
On Tue, Jan 19, 2016 at 5:38 PM, Matthias J. Sax <[hidden email]> wrote: Your "SocketWriter-Thread" code will run on your client. All code in |
Yes (if I understand correctly what you aim for).
On 01/19/2016 05:57 PM, Saiph Kappa wrote: > Thanks for your reply Mattias. So it is not possible to open a socket > server in the JobGraph and having it open during the lifetime of the > job, is that what you are saying? And it is required to have an external > process to open that socket server. > > On Tue, Jan 19, 2016 at 5:38 PM, Matthias J. Sax <[hidden email] > <mailto:[hidden email]>> wrote: > > Your "SocketWriter-Thread" code will run on your client. All code in > "main" runs on the client. > > execute() itself runs on the client, too. Of course, it triggers the job > submission to the cluster. In this step, the assembled job from the > previous calls is translated into the JobGraph which is submitted to the > JobManager for execution. > > You should start your SocketWriter-Thread manually on the cluster, ie, > if you use "localhost" in "env.socketTextStream", it must be the > TaskManager machine that executes this SocketStream-source task. > > I guess, it would be better not to use "localhost", but start your > SocketWriter-Thread on a dedicated machine in the cluster, and connect > your SocketStream-source to this machine via its host name. > > -Matthias > > > > On 01/19/2016 03:57 PM, Saiph Kappa wrote: > > Hi, > > > > This is a simple example that I found using Flink Stream. I changed it > > so the flink client can be executed on a remote cluster, and so > that it > > can open a socket server to ship its results for any other consumer > > machine. It seems to me that the socket server is not being open > in the > > remote cluster, but rather in my local machine (which I'm using to > > launch the app). How can I achieve that? I want to be able to ship > > results directly from the remote cluster, and through a socket server > > where clients can use as a tap. > > > > Sorry about indentation: > > > > |def main(args: Array[String]) { | > > > > val env = > > StreamExecutionEnvironment.createRemoteEnvironment("myhostname", > > DefaultFlinkMasterPort, > > > > ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at > > map it to StockPrice objects val socketStockStream = > > env.socketTextStream("localhost", 9999).map(x => { val split = > > x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate > other > > stock streams val SPX_Stream = > env.addSource(generateStock("SPX")(10) _) > > val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val > > DJI_Stream = env.addSource(generateStock("DJI")(30) _) val > BUX_Stream = > > env.addSource(generateStock("BUX")(40) _) //Merge all stock streams > > together val stockStream = socketStockStream.merge(SPX_Stream, > > FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print() > > | > > > > // WHERE IS THE FOLLOWING CODE RUN? > > > > |var out: PrintWriter = null > > new Thread { > > override def run(): Unit = { > > val serverSocket = new ServerSocket(12345) > > while (true) { > > val socket = serverSocket.accept() > > val hostname = socket.getInetAddress.getHostName.split('.').head > > println(s"Got a new connection from $hostname") > > out = new PrintWriter(socket.getOutputStream) > > } > > } > > }.start() > > > > |||stockStream|.addSink(record => { > > if(out != null) { > > out.write(record) > > out.flush() > > } > > }) > > > > env.execute("Stock stream") }| > > > > Thanks. > > signature.asc (836 bytes) Download Attachment |
Free forum by Nabble | Edit this page |