Hi Community ,
Following is the Flink Job .Job Parallelism is 4. Source Kafka -> Processor -> AsyncIO Sink (AWS SNS) Aim: The job needs to run for a load of around 10k per second. And the latency should be kept as minimum as possible since this is one of the 3 stages where the event would pass through What Happened But once 15k record is processed , the job fails with the OutOfMemory Exception. 15:17:23,090 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - SNS SINK (7/8) (621fe31aa71a5aac49b49eec6c849596) switched from RUNNING to FAILED. java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134) at com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1337) at com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1329) at firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:174) at firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:151) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:192) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Async IO Call and Method DataStream<Tuple2<String,Message>> resultStream = private static class AsyncHttpRequest extends RichAsyncFunction<Message, Tuple2<String, Flink Dashboard at the time of crash. Task Manager Memory = 6GB Job Manager memory = 1GB.
1) Here I see the direct memory capacity is the one which is used entirely. Am planning to increase and test it. Should the capacity be increased here ? I read it in the flink documentation that capacity should be kept less to stop memory overflow. But we are looking at 30k-50k peak second load[ around 8 hours per day] in next 1 year. What is the configuration recommendation to design such system. 2) One thought I have is to have double/triple the number of ASYNC IO operators to kafka source operators . Let me know your thoughts on the same. Prasanna. |
Hi Prasanna, could you please try moving all the expensive (both for memory and CPU) operations into open of the async function? I mean these functions here that are probably leaking resources.
Additionally, your use of CompletableFuture looks odd. I'd use the AsyncHandler of SNS to directly process the results in the thread pool of SNS instead of using the common ForkJoinPool of Java. snsClient.publishAsync(request, new AsyncHandler() { On Wed, Sep 30, 2020 at 8:10 PM Prasanna kumar <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Arvid, Once i moved the static code to open function , there is no out of memory error. Thanks, Prasanna. On Thu, Oct 1, 2020 at 3:31 AM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |