Experiencing long latency while using sockets

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Experiencing long latency while using sockets

Chao Wang
Hi,

I have been trying to benchmark the end-to-end latency of a Flink 1.3.1
application, but got confused regarding the amount of time spent in
Flink. In my setting, data source and data sink dwell in separated
machines, like the following topology:

Machine 1                                            Machine 2      
Machine 3
data source (via a socket client)   ->      Flink ->    data sink (via a
socket server)

I observed 200-400 milliseconds end-to-end latency, while the execution
time of my stream transformations took no more than two milliseconds,
and the socket-only networking latency between machines is no more than
one millisecond, and I used ptpd so that the clock offset between
machines were also no more than one millisecond.

Question: What took those hundreds of milliseconds?

Here are the details of my setting and my observation so far:

On Machine 2, I implemented a socket server as a data source to Flink
(by implementing SourceFunction), and I splited the incoming stream into
several streams (by SplitStream) for some transformations (implementing
MapFuction and CoFlatMapFunction), where the results were fed to socket
(using writeToSocket). I used c++11's chrono time library (through JNI)
to take timestamps and determine the elapsed time, and I have verified
that the overhead of timestamping this way is no more than one millisecond.

I observed that for the four consecutive writes from Machine 1, with the
time between two writes no more than 0.3 milliseconds, on Machine 2
Flink got the first write in 0.2 milliseconds, but then it took 90
milliseconds for Flink to get the next write, and another 4 milliseconds
for the third write, and yet another 4 milliseconds for the fourth write.

And then it took more than 70 milliseconds before Flink started
processing my plan's first stream transformation. And after my last
transformation, it took more than 70 milliseconds before the result was
received at Machine 3.


Thank you,

Chao


Reply | Threaded
Open this post in threaded view
|

Re: Experiencing long latency while using sockets

Chao Wang
Following the original post, I've tried stripping down my Flink app to
only the following, and then it still exhibits long latencies: after the
second source socket write, it took 90+ milliseconds from data source to
the socket-front in Flink. I would like to ask for pointers about how to
investigate the latency issue like this, and in general how to properly
benchmark Flink latencies. Thank you very much!


The main method:


   public static void main(String[] args) throws Exception {
     final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
     DataStream<EventGroup> inEventGroupStream = env.addSource(new
SocketEventGroupStreamFunction(6065, 512));
     inEventGroupStream.writeToSocket("DestHost", 6066, new
MySeGroup<EventGroup>());
     env.execute("event processing");
  }


where all the custom classes are as follows (for
serialization/deserialization and socket server functionality):


   public static class MySeGroup<T> implements
SerializationSchema<EventGroup> {

     @Override
     public byte[] serialize(EventGroup arg0) {
       int tLength = EKFFFTAES.getSizeTimepoint();
       //Note: report error if tLength != arg0.getT().length
       if (tLength != arg0.getT().length) {
         System.out.println ("Serialization error: Timepoint size
discrepancy.");
         System.out.println ("tLength = " + tLength);
         System.out.println ("arg0.getT().length = " + arg0.getT().length);
       }
       byte[] buffer = new byte[1 + arg0.getT().length +
arg0.getP().length];
       buffer[0] = arg0.type;
       System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
       System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength,
arg0.getP().length);
       return buffer;
     }
   }

   public static class Event extends SimpleImmutableEntry<byte[],byte[]> {

     Event(byte[] timestamp, byte[] payload){
       super(timestamp, payload);
     }
     public byte[] getT() { // get the timestamp
       return getKey();
     }
     public byte[] getP() { // get the payload
       return getValue();
     }
   }

   public static class EventGroup extends Event {
     public byte type;
     EventGroup(byte type, byte[] timestamp, byte[] payload){
       super(timestamp, payload);
       this.type = type;
     }
   }


   public static class SocketEventGroupStreamFunction implements
SourceFunction<EventGroup> {

     private transient ServerSocket serverSocket;
     private int serverPort;
     private int dataLength;
     private byte[] inbuf;
     private byte[] timestamp;
     private byte[] payload;
     private int tLength = EKFFFTAES.getSizeTimepoint();
     private volatile boolean isRunning = true;

     public SocketEventGroupStreamFunction(int port, int length) {
       serverPort = port;
       dataLength = length;
       inbuf = new byte[1 + dataLength + tLength];
       timestamp = new byte[tLength];
       payload = new byte[dataLength];
     }

     @Override
     public void run(SourceContext<EventGroup> ctx) throws Exception {
       while(isRunning) {
         serverSocket = new ServerSocket(serverPort, 100,
InetAddress.getByName("192.168.1.13"));
         serverSocket.setSoTimeout(1000000);
         System.out.println("Waiting for incoming connections on port " +
           serverSocket.getLocalPort() + "...");
         Socket server = serverSocket.accept();

         System.out.println("Just connected to " +
server.getRemoteSocketAddress());
         DataInputStream in = new DataInputStream(server.getInputStream());

         while(isRunning) {
           in.readFully(inbuf, 0, inbuf.length);
           System.arraycopy(inbuf, 1, timestamp, 0, tLength);
           System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);

           System.out.print("Got an event " + inbuf[0] + ": ");
           displayElapsedTime(timestamp);

           ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
         }
       }
     }

     @Override
     public void cancel() {
       isRunning = false;
       ServerSocket theSocket = this.serverSocket;
       if (theSocket != null) {
         try {
           theSocket.close();
         }catch(SocketTimeoutException s) {
           System.out.println("Socket timed out!");
         }catch(IOException e) {
           e.printStackTrace();
         }
       }
     }
   }


and finally, EKFFFTAES is my cpp library implementing the timestamping
facility:


int timePointLength = sizeof(std::chrono::system_clock::time_point);

JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
   (JNIEnv *, jclass)
{
   return ::timePointLength;
}

JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime
   (JNIEnv *env, jclass, jbyteArray inArray)
{
   std::chrono::system_clock::time_point end =
     std::chrono::system_clock::now();
   jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
   std::chrono::system_clock::time_point start;
   std::memcpy (&start, inCArray, ::timePointLength);
   std::cout <<
std::chrono::duration_cast<std::chrono::microseconds>(end -
start).count() << std::endl;
}


Thank you,

Chao

On 08/07/2017 03:20 PM, Chao Wang wrote:

> Hi,
>
> I have been trying to benchmark the end-to-end latency of a Flink
> 1.3.1 application, but got confused regarding the amount of time spent
> in Flink. In my setting, data source and data sink dwell in separated
> machines, like the following topology:
>
> Machine 1                                            Machine 2      
> Machine 3
> data source (via a socket client)   ->      Flink ->    data sink (via
> a socket server)
>
> I observed 200-400 milliseconds end-to-end latency, while the
> execution time of my stream transformations took no more than two
> milliseconds, and the socket-only networking latency between machines
> is no more than one millisecond, and I used ptpd so that the clock
> offset between machines were also no more than one millisecond.
>
> Question: What took those hundreds of milliseconds?
>
> Here are the details of my setting and my observation so far:
>
> On Machine 2, I implemented a socket server as a data source to Flink
> (by implementing SourceFunction), and I splited the incoming stream
> into several streams (by SplitStream) for some transformations
> (implementing MapFuction and CoFlatMapFunction), where the results
> were fed to socket (using writeToSocket). I used c++11's chrono time
> library (through JNI) to take timestamps and determine the elapsed
> time, and I have verified that the overhead of timestamping this way
> is no more than one millisecond.
>
> I observed that for the four consecutive writes from Machine 1, with
> the time between two writes no more than 0.3 milliseconds, on Machine
> 2 Flink got the first write in 0.2 milliseconds, but then it took 90
> milliseconds for Flink to get the next write, and another 4
> milliseconds for the third write, and yet another 4 milliseconds for
> the fourth write.
>
> And then it took more than 70 milliseconds before Flink started
> processing my plan's first stream transformation. And after my last
> transformation, it took more than 70 milliseconds before the result
> was received at Machine 3.
>
>
> Thank you,
>
> Chao
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Experiencing long latency while using sockets

Fabian Hueske-2
One pointer is the StreamExecutionEnvironment.setBufferTimeout() parameter.
Flink's network stack collects records in buffers to send them over the network. A buffer is sent when it is completely filled or after a configurable timeout.
So if your program does not process many records, these records might "get stuck" in the buffers and be emitted after the timeout flushes the buffer.
The default timeout is 100ms. Try to reduce it.

Best, Fabian

2017-08-08 1:06 GMT+02:00 Chao Wang <[hidden email]>:
Following the original post, I've tried stripping down my Flink app to only the following, and then it still exhibits long latencies: after the second source socket write, it took 90+ milliseconds from data source to the socket-front in Flink. I would like to ask for pointers about how to investigate the latency issue like this, and in general how to properly benchmark Flink latencies. Thank you very much!


The main method:


  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<EventGroup> inEventGroupStream = env.addSource(new SocketEventGroupStreamFunction(6065, 512));
    inEventGroupStream.writeToSocket("DestHost", 6066, new MySeGroup<EventGroup>());
    env.execute("event processing");
 }


where all the custom classes are as follows (for serialization/deserialization and socket server functionality):


  public static class MySeGroup<T> implements SerializationSchema<EventGroup> {

    @Override
    public byte[] serialize(EventGroup arg0) {
      int tLength = EKFFFTAES.getSizeTimepoint();
      //Note: report error if tLength != arg0.getT().length
      if (tLength != arg0.getT().length) {
        System.out.println ("Serialization error: Timepoint size discrepancy.");
        System.out.println ("tLength = " + tLength);
        System.out.println ("arg0.getT().length = " + arg0.getT().length);
      }
      byte[] buffer = new byte[1 + arg0.getT().length + arg0.getP().length];
      buffer[0] = arg0.type;
      System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
      System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength, arg0.getP().length);
      return buffer;
    }
  }

  public static class Event extends SimpleImmutableEntry<byte[],byte[]> {

    Event(byte[] timestamp, byte[] payload){
      super(timestamp, payload);
    }
    public byte[] getT() { // get the timestamp
      return getKey();
    }
    public byte[] getP() { // get the payload
      return getValue();
    }
  }

  public static class EventGroup extends Event {
    public byte type;
    EventGroup(byte type, byte[] timestamp, byte[] payload){
      super(timestamp, payload);
      this.type = type;
    }
  }


  public static class SocketEventGroupStreamFunction implements SourceFunction<EventGroup> {

    private transient ServerSocket serverSocket;
    private int serverPort;
    private int dataLength;
    private byte[] inbuf;
    private byte[] timestamp;
    private byte[] payload;
    private int tLength = EKFFFTAES.getSizeTimepoint();
    private volatile boolean isRunning = true;

    public SocketEventGroupStreamFunction(int port, int length) {
      serverPort = port;
      dataLength = length;
      inbuf = new byte[1 + dataLength + tLength];
      timestamp = new byte[tLength];
      payload = new byte[dataLength];
    }

    @Override
    public void run(SourceContext<EventGroup> ctx) throws Exception {
      while(isRunning) {
        serverSocket = new ServerSocket(serverPort, 100, InetAddress.getByName("192.168.1.13"));
        serverSocket.setSoTimeout(1000000);
        System.out.println("Waiting for incoming connections on port " +
          serverSocket.getLocalPort() + "...");
        Socket server = serverSocket.accept();

        System.out.println("Just connected to " + server.getRemoteSocketAddress());
        DataInputStream in = new DataInputStream(server.getInputStream());

        while(isRunning) {
          in.readFully(inbuf, 0, inbuf.length);
          System.arraycopy(inbuf, 1, timestamp, 0, tLength);
          System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);

          System.out.print("Got an event " + inbuf[0] + ": ");
          displayElapsedTime(timestamp);

          ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
        }
      }
    }

    @Override
    public void cancel() {
      isRunning = false;
      ServerSocket theSocket = this.serverSocket;
      if (theSocket != null) {
        try {
          theSocket.close();
        }catch(SocketTimeoutException s) {
          System.out.println("Socket timed out!");
        }catch(IOException e) {
          e.printStackTrace();
        }
      }
    }
  }


and finally, EKFFFTAES is my cpp library implementing the timestamping facility:


int timePointLength = sizeof(std::chrono::system_clock::time_point);

JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
  (JNIEnv *, jclass)
{
  return ::timePointLength;
}

JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime
  (JNIEnv *env, jclass, jbyteArray inArray)
{
  std::chrono::system_clock::time_point end =
    std::chrono::system_clock::now();
  jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
  std::chrono::system_clock::time_point start;
  std::memcpy (&start, inCArray, ::timePointLength);
  std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << std::endl;
}


Thank you,

Chao


On 08/07/2017 03:20 PM, Chao Wang wrote:
Hi,

I have been trying to benchmark the end-to-end latency of a Flink 1.3.1 application, but got confused regarding the amount of time spent in Flink. In my setting, data source and data sink dwell in separated machines, like the following topology:

Machine 1                                            Machine 2      Machine 3
data source (via a socket client)   ->      Flink ->    data sink (via a socket server)

I observed 200-400 milliseconds end-to-end latency, while the execution time of my stream transformations took no more than two milliseconds, and the socket-only networking latency between machines is no more than one millisecond, and I used ptpd so that the clock offset between machines were also no more than one millisecond.

Question: What took those hundreds of milliseconds?

Here are the details of my setting and my observation so far:

On Machine 2, I implemented a socket server as a data source to Flink (by implementing SourceFunction), and I splited the incoming stream into several streams (by SplitStream) for some transformations (implementing MapFuction and CoFlatMapFunction), where the results were fed to socket (using writeToSocket). I used c++11's chrono time library (through JNI) to take timestamps and determine the elapsed time, and I have verified that the overhead of timestamping this way is no more than one millisecond.

I observed that for the four consecutive writes from Machine 1, with the time between two writes no more than 0.3 milliseconds, on Machine 2 Flink got the first write in 0.2 milliseconds, but then it took 90 milliseconds for Flink to get the next write, and another 4 milliseconds for the third write, and yet another 4 milliseconds for the fourth write.

And then it took more than 70 milliseconds before Flink started processing my plan's first stream transformation. And after my last transformation, it took more than 70 milliseconds before the result was received at Machine 3.


Thank you,

Chao




Reply | Threaded
Open this post in threaded view
|

Re: Experiencing long latency while using sockets

Chao Wang

Thank you, Fabian.

Maybe there's also some buffers sit between data source and the first operator? I observed that in my implementation of SourceFunction (using a socket server, as listed in the previous email), for receiving two messages, in terms of event time, it took 0.2 ms before the SourceFunction receives the first message but then it took 97 ms to receive the second message. The interval between the two sends is 0.07 ms at the sending side, which is a java socket client.

Or could it be that there is a timeout setting for scheduling data source in Flink?


Thanks,

Chao


On 08/08/2017 02:58 AM, Fabian Hueske wrote:
One pointer is the StreamExecutionEnvironment.setBufferTimeout() parameter.
Flink's network stack collects records in buffers to send them over the network. A buffer is sent when it is completely filled or after a configurable timeout.
So if your program does not process many records, these records might "get stuck" in the buffers and be emitted after the timeout flushes the buffer.
The default timeout is 100ms. Try to reduce it.

Best, Fabian

2017-08-08 1:06 GMT+02:00 Chao Wang <[hidden email]>:
Following the original post, I've tried stripping down my Flink app to only the following, and then it still exhibits long latencies: after the second source socket write, it took 90+ milliseconds from data source to the socket-front in Flink. I would like to ask for pointers about how to investigate the latency issue like this, and in general how to properly benchmark Flink latencies. Thank you very much!


The main method:


  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<EventGroup> inEventGroupStream = env.addSource(new SocketEventGroupStreamFunction(6065, 512));
    inEventGroupStream.writeToSocket("DestHost", 6066, new MySeGroup<EventGroup>());
    env.execute("event processing");
 }


where all the custom classes are as follows (for serialization/deserialization and socket server functionality):


  public static class MySeGroup<T> implements SerializationSchema<EventGroup> {

    @Override
    public byte[] serialize(EventGroup arg0) {
      int tLength = EKFFFTAES.getSizeTimepoint();
      //Note: report error if tLength != arg0.getT().length
      if (tLength != arg0.getT().length) {
        System.out.println ("Serialization error: Timepoint size discrepancy.");
        System.out.println ("tLength = " + tLength);
        System.out.println ("arg0.getT().length = " + arg0.getT().length);
      }
      byte[] buffer = new byte[1 + arg0.getT().length + arg0.getP().length];
      buffer[0] = arg0.type;
      System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
      System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength, arg0.getP().length);
      return buffer;
    }
  }

  public static class Event extends SimpleImmutableEntry<byte[],byte[]> {

    Event(byte[] timestamp, byte[] payload){
      super(timestamp, payload);
    }
    public byte[] getT() { // get the timestamp
      return getKey();
    }
    public byte[] getP() { // get the payload
      return getValue();
    }
  }

  public static class EventGroup extends Event {
    public byte type;
    EventGroup(byte type, byte[] timestamp, byte[] payload){
      super(timestamp, payload);
      this.type = type;
    }
  }


  public static class SocketEventGroupStreamFunction implements SourceFunction<EventGroup> {

    private transient ServerSocket serverSocket;
    private int serverPort;
    private int dataLength;
    private byte[] inbuf;
    private byte[] timestamp;
    private byte[] payload;
    private int tLength = EKFFFTAES.getSizeTimepoint();
    private volatile boolean isRunning = true;

    public SocketEventGroupStreamFunction(int port, int length) {
      serverPort = port;
      dataLength = length;
      inbuf = new byte[1 + dataLength + tLength];
      timestamp = new byte[tLength];
      payload = new byte[dataLength];
    }

    @Override
    public void run(SourceContext<EventGroup> ctx) throws Exception {
      while(isRunning) {
        serverSocket = new ServerSocket(serverPort, 100, InetAddress.getByName("192.168.1.13"));
        serverSocket.setSoTimeout(1000000);
        System.out.println("Waiting for incoming connections on port " +
          serverSocket.getLocalPort() + "...");
        Socket server = serverSocket.accept();

        System.out.println("Just connected to " + server.getRemoteSocketAddress());
        DataInputStream in = new DataInputStream(server.getInputStream());

        while(isRunning) {
          in.readFully(inbuf, 0, inbuf.length);
          System.arraycopy(inbuf, 1, timestamp, 0, tLength);
          System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);

          System.out.print("Got an event " + inbuf[0] + ": ");
          displayElapsedTime(timestamp);

          ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
        }
      }
    }

    @Override
    public void cancel() {
      isRunning = false;
      ServerSocket theSocket = this.serverSocket;
      if (theSocket != null) {
        try {
          theSocket.close();
        }catch(SocketTimeoutException s) {
          System.out.println("Socket timed out!");
        }catch(IOException e) {
          e.printStackTrace();
        }
      }
    }
  }


and finally, EKFFFTAES is my cpp library implementing the timestamping facility:


int timePointLength = sizeof(std::chrono::system_clock::time_point);

JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
  (JNIEnv *, jclass)
{
  return ::timePointLength;
}

JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime
  (JNIEnv *env, jclass, jbyteArray inArray)
{
  std::chrono::system_clock::time_point end =
    std::chrono::system_clock::now();
  jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
  std::chrono::system_clock::time_point start;
  std::memcpy (&start, inCArray, ::timePointLength);
  std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << std::endl;
}


Thank you,

Chao


On 08/07/2017 03:20 PM, Chao Wang wrote:
Hi,

I have been trying to benchmark the end-to-end latency of a Flink 1.3.1 application, but got confused regarding the amount of time spent in Flink. In my setting, data source and data sink dwell in separated machines, like the following topology:

Machine 1                                            Machine 2      Machine 3
data source (via a socket client)   ->      Flink ->    data sink (via a socket server)

I observed 200-400 milliseconds end-to-end latency, while the execution time of my stream transformations took no more than two milliseconds, and the socket-only networking latency between machines is no more than one millisecond, and I used ptpd so that the clock offset between machines were also no more than one millisecond.

Question: What took those hundreds of milliseconds?

Here are the details of my setting and my observation so far:

On Machine 2, I implemented a socket server as a data source to Flink (by implementing SourceFunction), and I splited the incoming stream into several streams (by SplitStream) for some transformations (implementing MapFuction and CoFlatMapFunction), where the results were fed to socket (using writeToSocket). I used c++11's chrono time library (through JNI) to take timestamps and determine the elapsed time, and I have verified that the overhead of timestamping this way is no more than one millisecond.

I observed that for the four consecutive writes from Machine 1, with the time between two writes no more than 0.3 milliseconds, on Machine 2 Flink got the first write in 0.2 milliseconds, but then it took 90 milliseconds for Flink to get the next write, and another 4 milliseconds for the third write, and yet another 4 milliseconds for the fourth write.

And then it took more than 70 milliseconds before Flink started processing my plan's first stream transformation. And after my last transformation, it took more than 70 milliseconds before the result was received at Machine 3.


Thank you,

Chao





Reply | Threaded
Open this post in threaded view
|

Re: Experiencing long latency while using sockets

Chao Wang

It seems that the observed long latencies were due to certain one-time internal mechanism that only occurred after Flink has received the first message. Based on my measurement that mechanism took around 100 ms.

Now I setup my application the following way, and I observed that the end-to-end latency is similar to that of using raw sockets (off by less than 1 ms): Send the first message to Flink and then wait for 110 ms before sending the second message. And for the subsequent sends we can remove the 110 ms wait.


Chao

On 08/09/2017 10:57 AM, Chao Wang wrote:

Thank you, Fabian.

Maybe there's also some buffers sit between data source and the first operator? I observed that in my implementation of SourceFunction (using a socket server, as listed in the previous email), for receiving two messages, in terms of event time, it took 0.2 ms before the SourceFunction receives the first message but then it took 97 ms to receive the second message. The interval between the two sends is 0.07 ms at the sending side, which is a java socket client.

Or could it be that there is a timeout setting for scheduling data source in Flink?


Thanks,

Chao


On 08/08/2017 02:58 AM, Fabian Hueske wrote:
One pointer is the StreamExecutionEnvironment.setBufferTimeout() parameter.
Flink's network stack collects records in buffers to send them over the network. A buffer is sent when it is completely filled or after a configurable timeout.
So if your program does not process many records, these records might "get stuck" in the buffers and be emitted after the timeout flushes the buffer.
The default timeout is 100ms. Try to reduce it.

Best, Fabian

2017-08-08 1:06 GMT+02:00 Chao Wang <[hidden email]>:
Following the original post, I've tried stripping down my Flink app to only the following, and then it still exhibits long latencies: after the second source socket write, it took 90+ milliseconds from data source to the socket-front in Flink. I would like to ask for pointers about how to investigate the latency issue like this, and in general how to properly benchmark Flink latencies. Thank you very much!


The main method:


  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<EventGroup> inEventGroupStream = env.addSource(new SocketEventGroupStreamFunction(6065, 512));
    inEventGroupStream.writeToSocket("DestHost", 6066, new MySeGroup<EventGroup>());
    env.execute("event processing");
 }


where all the custom classes are as follows (for serialization/deserialization and socket server functionality):


  public static class MySeGroup<T> implements SerializationSchema<EventGroup> {

    @Override
    public byte[] serialize(EventGroup arg0) {
      int tLength = EKFFFTAES.getSizeTimepoint();
      //Note: report error if tLength != arg0.getT().length
      if (tLength != arg0.getT().length) {
        System.out.println ("Serialization error: Timepoint size discrepancy.");
        System.out.println ("tLength = " + tLength);
        System.out.println ("arg0.getT().length = " + arg0.getT().length);
      }
      byte[] buffer = new byte[1 + arg0.getT().length + arg0.getP().length];
      buffer[0] = arg0.type;
      System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
      System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength, arg0.getP().length);
      return buffer;
    }
  }

  public static class Event extends SimpleImmutableEntry<byte[],byte[]> {

    Event(byte[] timestamp, byte[] payload){
      super(timestamp, payload);
    }
    public byte[] getT() { // get the timestamp
      return getKey();
    }
    public byte[] getP() { // get the payload
      return getValue();
    }
  }

  public static class EventGroup extends Event {
    public byte type;
    EventGroup(byte type, byte[] timestamp, byte[] payload){
      super(timestamp, payload);
      this.type = type;
    }
  }


  public static class SocketEventGroupStreamFunction implements SourceFunction<EventGroup> {

    private transient ServerSocket serverSocket;
    private int serverPort;
    private int dataLength;
    private byte[] inbuf;
    private byte[] timestamp;
    private byte[] payload;
    private int tLength = EKFFFTAES.getSizeTimepoint();
    private volatile boolean isRunning = true;

    public SocketEventGroupStreamFunction(int port, int length) {
      serverPort = port;
      dataLength = length;
      inbuf = new byte[1 + dataLength + tLength];
      timestamp = new byte[tLength];
      payload = new byte[dataLength];
    }

    @Override
    public void run(SourceContext<EventGroup> ctx) throws Exception {
      while(isRunning) {
        serverSocket = new ServerSocket(serverPort, 100, InetAddress.getByName("192.168.1.13"));
        serverSocket.setSoTimeout(1000000);
        System.out.println("Waiting for incoming connections on port " +
          serverSocket.getLocalPort() + "...");
        Socket server = serverSocket.accept();

        System.out.println("Just connected to " + server.getRemoteSocketAddress());
        DataInputStream in = new DataInputStream(server.getInputStream());

        while(isRunning) {
          in.readFully(inbuf, 0, inbuf.length);
          System.arraycopy(inbuf, 1, timestamp, 0, tLength);
          System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);

          System.out.print("Got an event " + inbuf[0] + ": ");
          displayElapsedTime(timestamp);

          ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
        }
      }
    }

    @Override
    public void cancel() {
      isRunning = false;
      ServerSocket theSocket = this.serverSocket;
      if (theSocket != null) {
        try {
          theSocket.close();
        }catch(SocketTimeoutException s) {
          System.out.println("Socket timed out!");
        }catch(IOException e) {
          e.printStackTrace();
        }
      }
    }
  }


and finally, EKFFFTAES is my cpp library implementing the timestamping facility:


int timePointLength = sizeof(std::chrono::system_clock::time_point);

JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
  (JNIEnv *, jclass)
{
  return ::timePointLength;
}

JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime
  (JNIEnv *env, jclass, jbyteArray inArray)
{
  std::chrono::system_clock::time_point end =
    std::chrono::system_clock::now();
  jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
  std::chrono::system_clock::time_point start;
  std::memcpy (&start, inCArray, ::timePointLength);
  std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << std::endl;
}


Thank you,

Chao


On 08/07/2017 03:20 PM, Chao Wang wrote:
Hi,

I have been trying to benchmark the end-to-end latency of a Flink 1.3.1 application, but got confused regarding the amount of time spent in Flink. In my setting, data source and data sink dwell in separated machines, like the following topology:

Machine 1                                            Machine 2      Machine 3
data source (via a socket client)   ->      Flink ->    data sink (via a socket server)

I observed 200-400 milliseconds end-to-end latency, while the execution time of my stream transformations took no more than two milliseconds, and the socket-only networking latency between machines is no more than one millisecond, and I used ptpd so that the clock offset between machines were also no more than one millisecond.

Question: What took those hundreds of milliseconds?

Here are the details of my setting and my observation so far:

On Machine 2, I implemented a socket server as a data source to Flink (by implementing SourceFunction), and I splited the incoming stream into several streams (by SplitStream) for some transformations (implementing MapFuction and CoFlatMapFunction), where the results were fed to socket (using writeToSocket). I used c++11's chrono time library (through JNI) to take timestamps and determine the elapsed time, and I have verified that the overhead of timestamping this way is no more than one millisecond.

I observed that for the four consecutive writes from Machine 1, with the time between two writes no more than 0.3 milliseconds, on Machine 2 Flink got the first write in 0.2 milliseconds, but then it took 90 milliseconds for Flink to get the next write, and another 4 milliseconds for the third write, and yet another 4 milliseconds for the fourth write.

And then it took more than 70 milliseconds before Flink started processing my plan's first stream transformation. And after my last transformation, it took more than 70 milliseconds before the result was received at Machine 3.


Thank you,

Chao






Reply | Threaded
Open this post in threaded view
|

Re: Experiencing long latency while using sockets

Fabian Hueske-2
Great! Thanks for reporting back :-)

2017-08-09 22:52 GMT+02:00 Chao Wang <[hidden email]>:

It seems that the observed long latencies were due to certain one-time internal mechanism that only occurred after Flink has received the first message. Based on my measurement that mechanism took around 100 ms.

Now I setup my application the following way, and I observed that the end-to-end latency is similar to that of using raw sockets (off by less than 1 ms): Send the first message to Flink and then wait for 110 ms before sending the second message. And for the subsequent sends we can remove the 110 ms wait.


Chao


On 08/09/2017 10:57 AM, Chao Wang wrote:

Thank you, Fabian.

Maybe there's also some buffers sit between data source and the first operator? I observed that in my implementation of SourceFunction (using a socket server, as listed in the previous email), for receiving two messages, in terms of event time, it took 0.2 ms before the SourceFunction receives the first message but then it took 97 ms to receive the second message. The interval between the two sends is 0.07 ms at the sending side, which is a java socket client.

Or could it be that there is a timeout setting for scheduling data source in Flink?


Thanks,

Chao


On 08/08/2017 02:58 AM, Fabian Hueske wrote:
One pointer is the StreamExecutionEnvironment.setBufferTimeout() parameter.
Flink's network stack collects records in buffers to send them over the network. A buffer is sent when it is completely filled or after a configurable timeout.
So if your program does not process many records, these records might "get stuck" in the buffers and be emitted after the timeout flushes the buffer.
The default timeout is 100ms. Try to reduce it.

Best, Fabian

2017-08-08 1:06 GMT+02:00 Chao Wang <[hidden email]>:
Following the original post, I've tried stripping down my Flink app to only the following, and then it still exhibits long latencies: after the second source socket write, it took 90+ milliseconds from data source to the socket-front in Flink. I would like to ask for pointers about how to investigate the latency issue like this, and in general how to properly benchmark Flink latencies. Thank you very much!


The main method:


  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<EventGroup> inEventGroupStream = env.addSource(new SocketEventGroupStreamFunction(6065, 512));
    inEventGroupStream.writeToSocket("DestHost", 6066, new MySeGroup<EventGroup>());
    env.execute("event processing");
 }


where all the custom classes are as follows (for serialization/deserialization and socket server functionality):


  public static class MySeGroup<T> implements SerializationSchema<EventGroup> {

    @Override
    public byte[] serialize(EventGroup arg0) {
      int tLength = EKFFFTAES.getSizeTimepoint();
      //Note: report error if tLength != arg0.getT().length
      if (tLength != arg0.getT().length) {
        System.out.println ("Serialization error: Timepoint size discrepancy.");
        System.out.println ("tLength = " + tLength);
        System.out.println ("arg0.getT().length = " + arg0.getT().length);
      }
      byte[] buffer = new byte[1 + arg0.getT().length + arg0.getP().length];
      buffer[0] = arg0.type;
      System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
      System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength, arg0.getP().length);
      return buffer;
    }
  }

  public static class Event extends SimpleImmutableEntry<byte[],byte[]> {

    Event(byte[] timestamp, byte[] payload){
      super(timestamp, payload);
    }
    public byte[] getT() { // get the timestamp
      return getKey();
    }
    public byte[] getP() { // get the payload
      return getValue();
    }
  }

  public static class EventGroup extends Event {
    public byte type;
    EventGroup(byte type, byte[] timestamp, byte[] payload){
      super(timestamp, payload);
      this.type = type;
    }
  }


  public static class SocketEventGroupStreamFunction implements SourceFunction<EventGroup> {

    private transient ServerSocket serverSocket;
    private int serverPort;
    private int dataLength;
    private byte[] inbuf;
    private byte[] timestamp;
    private byte[] payload;
    private int tLength = EKFFFTAES.getSizeTimepoint();
    private volatile boolean isRunning = true;

    public SocketEventGroupStreamFunction(int port, int length) {
      serverPort = port;
      dataLength = length;
      inbuf = new byte[1 + dataLength + tLength];
      timestamp = new byte[tLength];
      payload = new byte[dataLength];
    }

    @Override
    public void run(SourceContext<EventGroup> ctx) throws Exception {
      while(isRunning) {
        serverSocket = new ServerSocket(serverPort, 100, InetAddress.getByName("192.168.1.13"));
        serverSocket.setSoTimeout(1000000);
        System.out.println("Waiting for incoming connections on port " +
          serverSocket.getLocalPort() + "...");
        Socket server = serverSocket.accept();

        System.out.println("Just connected to " + server.getRemoteSocketAddress());
        DataInputStream in = new DataInputStream(server.getInputStream());

        while(isRunning) {
          in.readFully(inbuf, 0, inbuf.length);
          System.arraycopy(inbuf, 1, timestamp, 0, tLength);
          System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);

          System.out.print("Got an event " + inbuf[0] + ": ");
          displayElapsedTime(timestamp);

          ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
        }
      }
    }

    @Override
    public void cancel() {
      isRunning = false;
      ServerSocket theSocket = this.serverSocket;
      if (theSocket != null) {
        try {
          theSocket.close();
        }catch(SocketTimeoutException s) {
          System.out.println("Socket timed out!");
        }catch(IOException e) {
          e.printStackTrace();
        }
      }
    }
  }


and finally, EKFFFTAES is my cpp library implementing the timestamping facility:


int timePointLength = sizeof(std::chrono::system_clock::time_point);

JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
  (JNIEnv *, jclass)
{
  return ::timePointLength;
}

JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime
  (JNIEnv *env, jclass, jbyteArray inArray)
{
  std::chrono::system_clock::time_point end =
    std::chrono::system_clock::now();
  jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
  std::chrono::system_clock::time_point start;
  std::memcpy (&start, inCArray, ::timePointLength);
  std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << std::endl;
}


Thank you,

Chao


On 08/07/2017 03:20 PM, Chao Wang wrote:
Hi,

I have been trying to benchmark the end-to-end latency of a Flink 1.3.1 application, but got confused regarding the amount of time spent in Flink. In my setting, data source and data sink dwell in separated machines, like the following topology:

Machine 1                                            Machine 2      Machine 3
data source (via a socket client)   ->      Flink ->    data sink (via a socket server)

I observed 200-400 milliseconds end-to-end latency, while the execution time of my stream transformations took no more than two milliseconds, and the socket-only networking latency between machines is no more than one millisecond, and I used ptpd so that the clock offset between machines were also no more than one millisecond.

Question: What took those hundreds of milliseconds?

Here are the details of my setting and my observation so far:

On Machine 2, I implemented a socket server as a data source to Flink (by implementing SourceFunction), and I splited the incoming stream into several streams (by SplitStream) for some transformations (implementing MapFuction and CoFlatMapFunction), where the results were fed to socket (using writeToSocket). I used c++11's chrono time library (through JNI) to take timestamps and determine the elapsed time, and I have verified that the overhead of timestamping this way is no more than one millisecond.

I observed that for the four consecutive writes from Machine 1, with the time between two writes no more than 0.3 milliseconds, on Machine 2 Flink got the first write in 0.2 milliseconds, but then it took 90 milliseconds for Flink to get the next write, and another 4 milliseconds for the third write, and yet another 4 milliseconds for the fourth write.

And then it took more than 70 milliseconds before Flink started processing my plan's first stream transformation. And after my last transformation, it took more than 70 milliseconds before the result was received at Machine 3.


Thank you,

Chao