Hello,
I am trying to separate the logic of my application by generating and processing data in different physical engines. I have created my custom socket source class: class SocketSourceFunction extends SourceFunction[Event2]{ @volatile private var isRunning:Boolean = true; @transient private var serverSocket: ServerSocketChannel = null; override def run(ctx: SourceContext[Event2]) = { val hostname = "localhost" val port = 6667 println("listening:" + port) val server = ServerSocketChannel.open(); server.bind(new InetSocketAddress (hostname, port)); var buffer = ByteBuffer.allocate (68); val des = new EventDeSerializer2() while (isRunning) { println("waiting...") var socketChannel = server.accept(); if (socketChannel != null){ println("accept:" + socketChannel) while (true) { var bytes = 0; bytes = socketChannel.read(buffer) if( bytes > 0) { if (!buffer.hasRemaining()) { buffer.rewind() var event: Event2 = des.deserialize(buffer.array()) ctx.collect(event) buffer.clear() } } } } } } override def cancel() = { isRunning = false; val socket = this.serverSocket; if (socket != null) { try { socket.close(); }catch { case e: Throwable => { System.err.println(String.format("error: %s", e.getMessage())); e.printStackTrace(); System.exit(1); } } } } } I am sending data with either raw sockets using ByteBuffers or with a Flink generator (serializing my Events and using writeToSocket() method). However, in both cases, I am experiencing less than 10x throughput in comparison to in-memory generation, even when using a 10gbit connection (the throughput is much lower). Is there any obvious defect in my implementation? Thank you in advance, George |
Hi George,
I suspect issuing a read operation for every 68 bytes incurs too much overhead to perform as you would like it to. Instead, create a bigger buffer (64k?) and extract single events from sub-regions of this buffer instead. Please note, however, that then the first buffer will only be processed when this method returns (the details depend on the underlying channel [1]). This is a trade-off between latency and throughput at some point. If you set non-blocking mode for your channels, you will always get what the channel has available and continue immediately. You can set this up via this, for example: ========== socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80)); while(! socketChannel.finishConnect() ){ //wait, or do something else... } ========== Nico [1] https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer) On 15/01/18 13:19, George Theodorakis wrote: > Hello, > > I am trying to separate the logic of my application by generating and > processing data in different physical engines. > > I have created my custom socket source class: > > class SocketSourceFunction extends SourceFunction[Event2]{ > @volatile private var isRunning:Boolean = true; > @transient private var serverSocket: ServerSocketChannel = null; > > override def run(ctx: SourceContext[Event2]) = { > val hostname = "localhost" > val port = 6667 > println("listening:" + port) > val server = ServerSocketChannel.open(); > server.bind(new InetSocketAddress (hostname, port)); > var buffer = ByteBuffer.allocate (68); > val des = new EventDeSerializer2() > > while (isRunning) { > println("waiting...") > var socketChannel = server.accept(); > > if (socketChannel != null){ > println("accept:" + socketChannel) > while (true) { > var bytes = 0; > bytes = socketChannel.read(buffer) > if( bytes > 0) { > if (!buffer.hasRemaining()) { > buffer.rewind() > var event: Event2 = des.deserialize(buffer.array()) > ctx.collect(event) > buffer.clear() > } > } > } > } > } > } > > override def cancel() = { > isRunning = false; > val socket = this.serverSocket; > if (socket != null) { > try { > socket.close(); > }catch { case e: Throwable => { > System.err.println(String.format("error: %s", e.getMessage())); > e.printStackTrace(); > System.exit(1); > } > } > } > } > } > > I am sending data with either raw sockets using ByteBuffers or with a > Flink generator (serializing my Events and using writeToSocket() > method). However, in both cases, I am experiencing less than 10x > throughput in comparison to in-memory generation, even when using > a 10gbit connection (the throughput is much lower). > > Is there any obvious defect in my implementation? > > Thank you in advance, > George signature.asc (201 bytes) Download Attachment |
(back to the ml again)
If you implement the ParallelSourceFunction interface instead, Flink will run as many source instances as the configured parallelism. Each instance will run the same code and you'll thus have multiple sockets to connect to, if that is what you wanted. One more thing regarding your source: typically you'd want the checkpoint lock around the collect() call, i.e. synchronized (ctx.getCheckpointLock()) { ctx.collect(...) } Nico On 16/01/18 12:27, George Theodorakis wrote: > Thank you very much, indeed this was my bottleneck. > > My problem now is that my source is not parallel, so when I am > increasing parallelism, system's throughput falls. > > Is opening multiple sockets a quick solution to make the source parallel? > > G. > > 2018-01-16 10:51 GMT+00:00 Nico Kruber <[hidden email] > <mailto:[hidden email]>>: > > Hi George, > I suspect issuing a read operation for every 68 bytes incurs too much > overhead to perform as you would like it to. Instead, create a bigger > buffer (64k?) and extract single events from sub-regions of this buffer > instead. > Please note, however, that then the first buffer will only be processed > when this method returns (the details depend on the underlying channel > [1]). This is a trade-off between latency and throughput at some point. > If you set non-blocking mode for your channels, you will always get what > the channel has available and continue immediately. You can set this up > via this, for example: > > ========== > socketChannel.configureBlocking(false); > socketChannel.connect(new InetSocketAddress("http://jenkov.com > <http://jenkov.com>", 80)); > > while(! socketChannel.finishConnect() ){ > //wait, or do something else... > } > ========== > > > Nico > > [1] > https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer) > <https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)> > > On 15/01/18 13:19, George Theodorakis wrote: > > Hello, > > > > I am trying to separate the logic of my application by generating and > > processing data in different physical engines. > > > > I have created my custom socket source class: > > > > class SocketSourceFunction extends SourceFunction[Event2]{ > > @volatile private var isRunning:Boolean = true; > > @transient private var serverSocket: ServerSocketChannel = > null; > > > > override def run(ctx: SourceContext[Event2]) = { > > val hostname = "localhost" > > val port = 6667 > > println("listening:" + port) > > val server = ServerSocketChannel.open(); > > server.bind(new InetSocketAddress (hostname, port)); > > var buffer = ByteBuffer.allocate (68); > > val des = new EventDeSerializer2() > > > > while (isRunning) { > > println("waiting...") > > var socketChannel = server.accept(); > > > > if (socketChannel != null){ > > println("accept:" + socketChannel) > > while (true) { > > var bytes = 0; > > bytes = socketChannel.read(buffer) > > if( bytes > 0) { > > if (!buffer.hasRemaining()) { > > buffer.rewind() > > var event: Event2 = des.deserialize(buffer.array()) > > ctx.collect(event) > > buffer.clear() > > } > > } > > } > > } > > } > > } > > > > override def cancel() = { > > isRunning = false; > > val socket = this.serverSocket; > > if (socket != null) { > > try { > > socket.close(); > > }catch { case e: Throwable => { > > System.err.println(String.format("error: %s", > e.getMessage())); > > e.printStackTrace(); > > System.exit(1); > > } > > } > > } > > } > > } > > > > I am sending data with either raw sockets using ByteBuffers or with a > > Flink generator (serializing my Events and using writeToSocket() > > method). However, in both cases, I am experiencing less than 10x > > throughput in comparison to in-memory generation, even when using > > a 10gbit connection (the throughput is much lower). > > > > Is there any obvious defect in my implementation? > > > > Thank you in advance, > > George > > signature.asc (201 bytes) Download Attachment |
Taking a step back: why do you want to manually implement communication via sockets in the first place? With this you will not get any fault-tolerance guarantees and I would guess that maintaining a custom solution is more difficult than using, say, Kafka.
Best, Aljoscha > On 16. Jan 2018, at 13:24, Nico Kruber <[hidden email]> wrote: > > (back to the ml again) > > If you implement the ParallelSourceFunction interface instead, Flink > will run as many source instances as the configured parallelism. Each > instance will run the same code and you'll thus have multiple sockets to > connect to, if that is what you wanted. > > > One more thing regarding your source: typically you'd want the > checkpoint lock around the collect() call, i.e. > > synchronized (ctx.getCheckpointLock()) { > ctx.collect(...) > } > > > Nico > > On 16/01/18 12:27, George Theodorakis wrote: >> Thank you very much, indeed this was my bottleneck. >> >> My problem now is that my source is not parallel, so when I am >> increasing parallelism, system's throughput falls. >> >> Is opening multiple sockets a quick solution to make the source parallel? >> >> G. >> >> 2018-01-16 10:51 GMT+00:00 Nico Kruber <[hidden email] >> <mailto:[hidden email]>>: >> >> Hi George, >> I suspect issuing a read operation for every 68 bytes incurs too much >> overhead to perform as you would like it to. Instead, create a bigger >> buffer (64k?) and extract single events from sub-regions of this buffer >> instead. >> Please note, however, that then the first buffer will only be processed >> when this method returns (the details depend on the underlying channel >> [1]). This is a trade-off between latency and throughput at some point. >> If you set non-blocking mode for your channels, you will always get what >> the channel has available and continue immediately. You can set this up >> via this, for example: >> >> ========== >> socketChannel.configureBlocking(false); >> socketChannel.connect(new InetSocketAddress("http://jenkov.com >> <http://jenkov.com>", 80)); >> >> while(! socketChannel.finishConnect() ){ >> //wait, or do something else... >> } >> ========== >> >> >> Nico >> >> [1] >> https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer) >> <https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)> >> >> On 15/01/18 13:19, George Theodorakis wrote: >>> Hello, >>> >>> I am trying to separate the logic of my application by generating and >>> processing data in different physical engines. >>> >>> I have created my custom socket source class: >>> >>> class SocketSourceFunction extends SourceFunction[Event2]{ >>> @volatile private var isRunning:Boolean = true; >>> @transient private var serverSocket: ServerSocketChannel = >> null; >>> >>> override def run(ctx: SourceContext[Event2]) = { >>> val hostname = "localhost" >>> val port = 6667 >>> println("listening:" + port) >>> val server = ServerSocketChannel.open(); >>> server.bind(new InetSocketAddress (hostname, port)); >>> var buffer = ByteBuffer.allocate (68); >>> val des = new EventDeSerializer2() >>> >>> while (isRunning) { >>> println("waiting...") >>> var socketChannel = server.accept(); >>> >>> if (socketChannel != null){ >>> println("accept:" + socketChannel) >>> while (true) { >>> var bytes = 0; >>> bytes = socketChannel.read(buffer) >>> if( bytes > 0) { >>> if (!buffer.hasRemaining()) { >>> buffer.rewind() >>> var event: Event2 = des.deserialize(buffer.array()) >>> ctx.collect(event) >>> buffer.clear() >>> } >>> } >>> } >>> } >>> } >>> } >>> >>> override def cancel() = { >>> isRunning = false; >>> val socket = this.serverSocket; >>> if (socket != null) { >>> try { >>> socket.close(); >>> }catch { case e: Throwable => { >>> System.err.println(String.format("error: %s", >> e.getMessage())); >>> e.printStackTrace(); >>> System.exit(1); >>> } >>> } >>> } >>> } >>> } >>> >>> I am sending data with either raw sockets using ByteBuffers or with a >>> Flink generator (serializing my Events and using writeToSocket() >>> method). However, in both cases, I am experiencing less than 10x >>> throughput in comparison to in-memory generation, even when using >>> a 10gbit connection (the throughput is much lower). >>> >>> Is there any obvious defect in my implementation? >>> >>> Thank you in advance, >>> George >> >> > |
Free forum by Nabble | Edit this page |