Re: Flink 1.10 Out of memory

Posted by Xintong Song on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-1-10-Out-of-memory-tp34406p34660.html

True. Thanks for the clarification.

Thank you~

Xintong Song



On Fri, Apr 24, 2020 at 5:21 PM Stephan Ewen <[hidden email]> wrote:
I think native methods are not in a forked process. It is just a malloc() call that failed, probably an I/O buffer or so.
This might mean that there really is no native memory available any more, meaning the process has hit its limit. In any case, a bit more JVM overhead should solve this.

On Fri, Apr 24, 2020 at 10:24 AM Xintong Song <[hidden email]> wrote:
I might be wrong about how JNI works. Isn't a native method always executed in another process?

I was searching for the java error message "Cannot allocate memory", and it seems this happens when JVM cannot allocate memory from the OS. Given the exception is thrown from calling a native method, I think the problem is that not enough native memory can be allocated for executing the native method.

Thank you~

Xintong Song



On Fri, Apr 24, 2020 at 3:40 PM Stephan Ewen <[hidden email]> wrote:
@Xintong - out of curiosity, where do you see that this tries to fork a process? I must be overlooking something, I could only see the native method call.

On Fri, Apr 24, 2020 at 4:53 AM Xintong Song <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)