Out Of Memory Error in AsyncIO Sink and Performance improvement considerations

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view

Out Of Memory Error in AsyncIO Sink and Performance improvement considerations

Prasanna kumar
Hi Community ,

Following is the Flink Job .Job Parallelism is 4.
Source Kafka -> Processor ->  AsyncIO Sink (AWS SNS)
The job needs to run for a load of around 10k per second.
And the latency should be kept as minimum as possible since this is one of the 3 stages where the event would pass through
What Happened
But once 15k record is processed , the job fails with the OutOfMemory Exception.

15:17:23,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - SNS SINK (7/8) (621fe31aa71a5aac49b49eec6c849596) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1337)
at com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1329)
at firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:174)
at firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:151)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:192)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)

Async IO Call and Method 

DataStream<Tuple2<String,Message>> resultStream =
AsyncDataStream.unorderedWait(sourceStream, new AsyncSNSPublishLoad.AsyncHttpRequest(), 15,
100).name("SNS SINK");
private static class AsyncHttpRequest extends RichAsyncFunction<Message, Tuple2<String,
Message>> {

public void asyncInvoke(Message eventMessage,
ResultFuture<Tuple2<String, Message>> resultFuture)
throws Exception {

AmazonSNSAsyncClientBuilder snsClientBuilder = AmazonSNSAsyncClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain());;
AmazonSNSAsync snsClient = snsClientBuilder.build();

String attributeContextID = eventMessage.getKey();

ObjectMapper mapper = new ObjectMapper();
//Converting the Object to JSONString
String JsonString = mapper.writeValueAsString(eventMessage);

PublishRequest request =
Future<PublishResult> snsResultFuture = snsClient.publishAsync(request);

CompletableFuture.supplyAsync(new Supplier<String>() {

public String get() {
try {
PublishResult result = snsResultFuture.get(5,
System.out.println("Received SNS message id: " + result.getMessageId());
return result.getMessageId();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// Normally handled explicitly.
System.out.println("Got exception when sending SNS message: " + e.getMessage());
return null;
}).thenAccept( (String snsResult) -> {
System.out.println("Accepted " + snsResult);
resultFuture.complete(Collections.singleton(new Tuple2<>(snsResult, eventMessage)));


Flink Dashboard at the time of crash. 
Task Manager Memory = 6GB
Job Manager memory = 1GB.



      1) Here I see the direct memory capacity is the one which is used entirely. Am planning to increase and test it. Should the capacity be increased here ? 
          I read it in the flink documentation that capacity should be kept less to stop memory overflow.  But we are looking at 30k-50k peak second load[ around 8 hours per day] in next 1 year.  What is the configuration recommendation to design such system.  
      2) One thought I have is to have double/triple the number of ASYNC IO operators to kafka source operators . 

Let me know your thoughts on the same.

Reply | Threaded
Open this post in threaded view

Re: Out Of Memory Error in AsyncIO Sink and Performance improvement considerations

Arvid Heise-3
Hi Prasanna,

could you please try moving all the expensive (both for memory and CPU) operations into open of the async function?

I mean these functions here that are probably leaking resources.

AmazonSNSAsyncClientBuilder snsClientBuilder = AmazonSNSAsyncClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain());;
AmazonSNSAsync snsClient = snsClientBuilder.build();

ObjectMapper mapper = new ObjectMapper();

Additionally, your use of CompletableFuture looks odd. I'd use the AsyncHandler of SNS to directly process the results in the thread pool of SNS instead of using the common ForkJoinPool of Java.

snsClient.publishAsync(request, new AsyncHandler() {

On Wed, Sep 30, 2020 at 8:10 PM Prasanna kumar <[hidden email]> wrote:
Hi Community ,

Following is the Flink Job .Job Parallelism is 4.
Source Kafka -> Processor ->  AsyncIO Sink (AWS SNS)
The job needs to run for a load of around 10k per second.
And the latency should be kept as minimum as possible since this is one of the 3 stages where the event would pass through
What Happened
But once 15k record is processed , the job fails with the OutOfMemory Exception.

15:17:23,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - SNS SINK (7/8) (621fe31aa71a5aac49b49eec6c849596) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1337)
at com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1329)
at firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:174)
at firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:151)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:192)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)

Async IO Call and Method 

DataStream<Tuple2<String,Message>> resultStream =
AsyncDataStream.unorderedWait(sourceStream, new AsyncSNSPublishLoad.AsyncHttpRequest(), 15,
100).name("SNS SINK");
private static class AsyncHttpRequest extends RichAsyncFunction<Message, Tuple2<String,
Message>> {

public void asyncInvoke(Message eventMessage,
ResultFuture<Tuple2<String, Message>> resultFuture)
throws Exception {

AmazonSNSAsyncClientBuilder snsClientBuilder = AmazonSNSAsyncClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain());;
AmazonSNSAsync snsClient = snsClientBuilder.build();

String attributeContextID = eventMessage.getKey();

ObjectMapper mapper = new ObjectMapper();
//Converting the Object to JSONString
String JsonString = mapper.writeValueAsString(eventMessage);

PublishRequest request =
Future<PublishResult> snsResultFuture = snsClient.publishAsync(request);

CompletableFuture.supplyAsync(new Supplier<String>() {

public String get() {
try {
PublishResult result = snsResultFuture.get(5,
System.out.println("Received SNS message id: " + result.getMessageId());
return result.getMessageId();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// Normally handled explicitly.
System.out.println("Got exception when sending SNS message: " + e.getMessage());
return null;
}).thenAccept( (String snsResult) -> {
System.out.println("Accepted " + snsResult);
resultFuture.complete(Collections.singleton(new Tuple2<>(snsResult, eventMessage)));


Flink Dashboard at the time of crash. 
Task Manager Memory = 6GB
Job Manager memory = 1GB.



      1) Here I see the direct memory capacity is the one which is used entirely. Am planning to increase and test it. Should the capacity be increased here ? 
          I read it in the flink documentation that capacity should be kept less to stop memory overflow.  But we are looking at 30k-50k peak second load[ around 8 hours per day] in next 1 year.  What is the configuration recommendation to design such system.  
      2) One thought I have is to have double/triple the number of ASYNC IO operators to kafka source operators . 

Let me know your thoughts on the same.



Arvid Heise | Senior Java Developer

Follow us @VervericaData


Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time


Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany


Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view

Re: Out Of Memory Error in AsyncIO Sink and Performance improvement considerations

Prasanna kumar

Once i moved the static code to open function , there is no out of memory error. 


On Thu, Oct 1, 2020 at 3:31 AM Arvid Heise <[hidden email]> wrote:
Hi Prasanna,

could you please try moving all the expensive (both for memory and CPU) operations into open of the async function?

I mean these functions here that are probably leaking resources.

AmazonSNSAsyncClientBuilder snsClientBuilder = AmazonSNSAsyncClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain());;
AmazonSNSAsync snsClient = snsClientBuilder.build();

ObjectMapper mapper = new ObjectMapper();

Additionally, your use of CompletableFuture looks odd. I'd use the AsyncHandler of SNS to directly process the results in the thread pool of SNS instead of using the common ForkJoinPool of Java.

snsClient.publishAsync(request, new AsyncHandler() {

On Wed, Sep 30, 2020 at 8:10 PM Prasanna kumar <[hidden email]> wrote:
Hi Community ,

Following is the Flink Job .Job Parallelism is 4.
Source Kafka -> Processor ->  AsyncIO Sink (AWS SNS)
The job needs to run for a load of around 10k per second.
And the latency should be kept as minimum as possible since this is one of the 3 stages where the event would pass through
What Happened
But once 15k record is processed , the job fails with the OutOfMemory Exception.

15:17:23,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - SNS SINK (7/8) (621fe31aa71a5aac49b49eec6c849596) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1337)
at com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1329)
at firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:174)
at firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:151)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:192)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)

Async IO Call and Method 

DataStream<Tuple2<String,Message>> resultStream =
AsyncDataStream.unorderedWait(sourceStream, new AsyncSNSPublishLoad.AsyncHttpRequest(), 15,
100).name("SNS SINK");
private static class AsyncHttpRequest extends RichAsyncFunction<Message, Tuple2<String,
Message>> {

public void asyncInvoke(Message eventMessage,
ResultFuture<Tuple2<String, Message>> resultFuture)
throws Exception {

AmazonSNSAsyncClientBuilder snsClientBuilder = AmazonSNSAsyncClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain());;
AmazonSNSAsync snsClient = snsClientBuilder.build();

String attributeContextID = eventMessage.getKey();

ObjectMapper mapper = new ObjectMapper();
//Converting the Object to JSONString
String JsonString = mapper.writeValueAsString(eventMessage);

PublishRequest request =
Future<PublishResult> snsResultFuture = snsClient.publishAsync(request);

CompletableFuture.supplyAsync(new Supplier<String>() {

public String get() {
try {
PublishResult result = snsResultFuture.get(5,
System.out.println("Received SNS message id: " + result.getMessageId());
return result.getMessageId();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// Normally handled explicitly.
System.out.println("Got exception when sending SNS message: " + e.getMessage());
return null;
}).thenAccept( (String snsResult) -> {
System.out.println("Accepted " + snsResult);
resultFuture.complete(Collections.singleton(new Tuple2<>(snsResult, eventMessage)));


Flink Dashboard at the time of crash. 
Task Manager Memory = 6GB
Job Manager memory = 1GB.



      1) Here I see the direct memory capacity is the one which is used entirely. Am planning to increase and test it. Should the capacity be increased here ? 
          I read it in the flink documentation that capacity should be kept less to stop memory overflow.  But we are looking at 30k-50k peak second load[ around 8 hours per day] in next 1 year.  What is the configuration recommendation to design such system.  
      2) One thought I have is to have double/triple the number of ASYNC IO operators to kafka source operators . 

Let me know your thoughts on the same.



Arvid Heise | Senior Java Developer

Follow us @VervericaData


Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time


Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany


Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng