Flink Stream: How to ship results through socket server

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Stream: How to ship results through socket server

Saiph Kappa
Hi,

This is a simple example that I found using Flink Stream. I changed it so the flink client can be executed on a remote cluster, and so that it can open a socket server to ship its results for any other consumer machine. It seems to me that the socket server is not being open in the remote cluster, but rather in my local machine (which I'm using to launch the app). How can I achieve that? I want to be able to ship results directly from the remote cluster, and through a socket server where clients can use as a tap.

Sorry about indentation:

def main(args: Array[String]) {

    val
env = StreamExecutionEnvironment.createRemoteEnvironment("myhostname", DefaultFlinkMasterPort,
  "myapp-assembly-0.1-SNAPSHOT.jar"); 

  //Read from a socket stream at map it to StockPrice objects
  val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
    val split = x.split(",")
    StockPrice(split(0), split(1).toDouble)
  })

  //Generate other stock streams
  val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
  val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
  val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
  val BUX_Stream = env.addSource(generateStock("BUX")(40) _)

  //Merge all stock streams together
  val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, 
    DJI_Stream, BUX_Stream)

  stockStream.print()

// WHERE IS THE FOLLOWING CODE RUN?
var out: PrintWriter = null
new Thread {
override def run(): Unit = {
val serverSocket = new ServerSocket(12345)
while (true) {
val socket = serverSocket.accept()
val hostname = socket.getInetAddress.getHostName.split('.').head
println(s"Got a new connection from $hostname")
out = new PrintWriter(socket.getOutputStream)
}
}
}.start()

stockStream.addSink(record => {
if(out != null) {
out.write(record)
out.flush()
}
})

env.execute("Stock stream") }
Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: How to ship results through socket server

Matthias J. Sax-2
Your "SocketWriter-Thread" code will run on your client. All code in
"main" runs on the client.

execute() itself runs on the client, too. Of course, it triggers the job
submission to the cluster. In this step, the assembled job from the
previous calls is translated into the JobGraph which is submitted to the
JobManager for execution.

You should start your SocketWriter-Thread manually on the cluster, ie,
if you use "localhost" in "env.socketTextStream", it must be the
TaskManager machine that executes this SocketStream-source task.

I guess, it would be better not to use "localhost", but start your
SocketWriter-Thread on a dedicated machine in the cluster, and connect
your SocketStream-source to this machine via its host name.

-Matthias



On 01/19/2016 03:57 PM, Saiph Kappa wrote:

> Hi,
>
> This is a simple example that I found using Flink Stream. I changed it
> so the flink client can be executed on a remote cluster, and so that it
> can open a socket server to ship its results for any other consumer
> machine. It seems to me that the socket server is not being open in the
> remote cluster, but rather in my local machine (which I'm using to
> launch the app). How can I achieve that? I want to be able to ship
> results directly from the remote cluster, and through a socket server
> where clients can use as a tap.
>
> Sorry about indentation:
>
> |def main(args: Array[String]) { |
>
>     val env =
> StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
> DefaultFlinkMasterPort,
>
> ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
> map it to StockPrice objects val socketStockStream =
> env.socketTextStream("localhost", 9999).map(x => { val split =
> x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate other
> stock streams val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
> val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
> DJI_Stream = env.addSource(generateStock("DJI")(30) _) val BUX_Stream =
> env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
> together val stockStream = socketStockStream.merge(SPX_Stream,
> FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
> |
>
> // WHERE IS THE FOLLOWING CODE RUN?
>
> |var out: PrintWriter = null
> new Thread {
> override def run(): Unit = {
> val serverSocket = new ServerSocket(12345)
> while (true) {
> val socket = serverSocket.accept()
> val hostname = socket.getInetAddress.getHostName.split('.').head
> println(s"Got a new connection from $hostname")
> out = new PrintWriter(socket.getOutputStream)
> }
> }
> }.start()
>
> |||stockStream|.addSink(record => {
> if(out != null) {
> out.write(record)
> out.flush()
> }
> })
>
> env.execute("Stock stream") }|
>
> Thanks.


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: How to ship results through socket server

Saiph Kappa
Thanks for your reply Mattias. So it is not possible to open a socket server in the JobGraph and having it open during the lifetime of the job, is that what you are saying? And it is required to have an external process to open that socket server.

On Tue, Jan 19, 2016 at 5:38 PM, Matthias J. Sax <[hidden email]> wrote:
Your "SocketWriter-Thread" code will run on your client. All code in
"main" runs on the client.

execute() itself runs on the client, too. Of course, it triggers the job
submission to the cluster. In this step, the assembled job from the
previous calls is translated into the JobGraph which is submitted to the
JobManager for execution.

You should start your SocketWriter-Thread manually on the cluster, ie,
if you use "localhost" in "env.socketTextStream", it must be the
TaskManager machine that executes this SocketStream-source task.

I guess, it would be better not to use "localhost", but start your
SocketWriter-Thread on a dedicated machine in the cluster, and connect
your SocketStream-source to this machine via its host name.

-Matthias



On 01/19/2016 03:57 PM, Saiph Kappa wrote:
> Hi,
>
> This is a simple example that I found using Flink Stream. I changed it
> so the flink client can be executed on a remote cluster, and so that it
> can open a socket server to ship its results for any other consumer
> machine. It seems to me that the socket server is not being open in the
> remote cluster, but rather in my local machine (which I'm using to
> launch the app). How can I achieve that? I want to be able to ship
> results directly from the remote cluster, and through a socket server
> where clients can use as a tap.
>
> Sorry about indentation:
>
> |def main(args: Array[String]) { |
>
>     val env =
> StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
> DefaultFlinkMasterPort,
>
> ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
> map it to StockPrice objects val socketStockStream =
> env.socketTextStream("localhost", 9999).map(x => { val split =
> x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate other
> stock streams val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
> val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
> DJI_Stream = env.addSource(generateStock("DJI")(30) _) val BUX_Stream =
> env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
> together val stockStream = socketStockStream.merge(SPX_Stream,
> FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
> |
>
> // WHERE IS THE FOLLOWING CODE RUN?
>
> |var out: PrintWriter = null
> new Thread {
> override def run(): Unit = {
> val serverSocket = new ServerSocket(12345)
> while (true) {
> val socket = serverSocket.accept()
> val hostname = socket.getInetAddress.getHostName.split('.').head
> println(s"Got a new connection from $hostname")
> out = new PrintWriter(socket.getOutputStream)
> }
> }
> }.start()
>
> |||stockStream|.addSink(record => {
> if(out != null) {
> out.write(record)
> out.flush()
> }
> })
>
> env.execute("Stock stream") }|
>
> Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: How to ship results through socket server

Matthias J. Sax-2
Yes (if I understand correctly what you aim for).

On 01/19/2016 05:57 PM, Saiph Kappa wrote:

> Thanks for your reply Mattias. So it is not possible to open a socket
> server in the JobGraph and having it open during the lifetime of the
> job, is that what you are saying? And it is required to have an external
> process to open that socket server.
>
> On Tue, Jan 19, 2016 at 5:38 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Your "SocketWriter-Thread" code will run on your client. All code in
>     "main" runs on the client.
>
>     execute() itself runs on the client, too. Of course, it triggers the job
>     submission to the cluster. In this step, the assembled job from the
>     previous calls is translated into the JobGraph which is submitted to the
>     JobManager for execution.
>
>     You should start your SocketWriter-Thread manually on the cluster, ie,
>     if you use "localhost" in "env.socketTextStream", it must be the
>     TaskManager machine that executes this SocketStream-source task.
>
>     I guess, it would be better not to use "localhost", but start your
>     SocketWriter-Thread on a dedicated machine in the cluster, and connect
>     your SocketStream-source to this machine via its host name.
>
>     -Matthias
>
>
>
>     On 01/19/2016 03:57 PM, Saiph Kappa wrote:
>     > Hi,
>     >
>     > This is a simple example that I found using Flink Stream. I changed it
>     > so the flink client can be executed on a remote cluster, and so
>     that it
>     > can open a socket server to ship its results for any other consumer
>     > machine. It seems to me that the socket server is not being open
>     in the
>     > remote cluster, but rather in my local machine (which I'm using to
>     > launch the app). How can I achieve that? I want to be able to ship
>     > results directly from the remote cluster, and through a socket server
>     > where clients can use as a tap.
>     >
>     > Sorry about indentation:
>     >
>     > |def main(args: Array[String]) { |
>     >
>     >     val env =
>     > StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
>     > DefaultFlinkMasterPort,
>     >
>     > ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
>     > map it to StockPrice objects val socketStockStream =
>     > env.socketTextStream("localhost", 9999).map(x => { val split =
>     > x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate
>     other
>     > stock streams val SPX_Stream =
>     env.addSource(generateStock("SPX")(10) _)
>     > val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
>     > DJI_Stream = env.addSource(generateStock("DJI")(30) _) val
>     BUX_Stream =
>     > env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
>     > together val stockStream = socketStockStream.merge(SPX_Stream,
>     > FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
>     > |
>     >
>     > // WHERE IS THE FOLLOWING CODE RUN?
>     >
>     > |var out: PrintWriter = null
>     > new Thread {
>     > override def run(): Unit = {
>     > val serverSocket = new ServerSocket(12345)
>     > while (true) {
>     > val socket = serverSocket.accept()
>     > val hostname = socket.getInetAddress.getHostName.split('.').head
>     > println(s"Got a new connection from $hostname")
>     > out = new PrintWriter(socket.getOutputStream)
>     > }
>     > }
>     > }.start()
>     >
>     > |||stockStream|.addSink(record => {
>     > if(out != null) {
>     > out.write(record)
>     > out.flush()
>     > }
>     > })
>     >
>     > env.execute("Stock stream") }|
>     >
>     > Thanks.
>
>


signature.asc (836 bytes) Download Attachment