Flink 1.10 Out of memory

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.10 Out of memory

Lasse Nedergaard-2
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)

Screenshot 2020-04-17 at 15.03.35.png (72K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Zahid Rahman

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Xintong Song
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Lasse Nedergaard-2
Hi

Thnaks for the reply. We Will try it out and let everybody know

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 20. apr. 2020 kl. 08.26 skrev Xintong Song <[hidden email]>:


Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Zahid Rahman
In reply to this post by Xintong Song
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Stephan Ewen
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Xintong Song
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Stephan Ewen
@Xintong - out of curiosity, where do you see that this tries to fork a process? I must be overlooking something, I could only see the native method call.

On Fri, Apr 24, 2020 at 4:53 AM Xintong Song <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Xintong Song
I might be wrong about how JNI works. Isn't a native method always executed in another process?

I was searching for the java error message "Cannot allocate memory", and it seems this happens when JVM cannot allocate memory from the OS. Given the exception is thrown from calling a native method, I think the problem is that not enough native memory can be allocated for executing the native method.

Thank you~

Xintong Song



On Fri, Apr 24, 2020 at 3:40 PM Stephan Ewen <[hidden email]> wrote:
@Xintong - out of curiosity, where do you see that this tries to fork a process? I must be overlooking something, I could only see the native method call.

On Fri, Apr 24, 2020 at 4:53 AM Xintong Song <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Stephan Ewen
I think native methods are not in a forked process. It is just a malloc() call that failed, probably an I/O buffer or so.
This might mean that there really is no native memory available any more, meaning the process has hit its limit. In any case, a bit more JVM overhead should solve this.

On Fri, Apr 24, 2020 at 10:24 AM Xintong Song <[hidden email]> wrote:
I might be wrong about how JNI works. Isn't a native method always executed in another process?

I was searching for the java error message "Cannot allocate memory", and it seems this happens when JVM cannot allocate memory from the OS. Given the exception is thrown from calling a native method, I think the problem is that not enough native memory can be allocated for executing the native method.

Thank you~

Xintong Song



On Fri, Apr 24, 2020 at 3:40 PM Stephan Ewen <[hidden email]> wrote:
@Xintong - out of curiosity, where do you see that this tries to fork a process? I must be overlooking something, I could only see the native method call.

On Fri, Apr 24, 2020 at 4:53 AM Xintong Song <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Xintong Song
True. Thanks for the clarification.

Thank you~

Xintong Song



On Fri, Apr 24, 2020 at 5:21 PM Stephan Ewen <[hidden email]> wrote:
I think native methods are not in a forked process. It is just a malloc() call that failed, probably an I/O buffer or so.
This might mean that there really is no native memory available any more, meaning the process has hit its limit. In any case, a bit more JVM overhead should solve this.

On Fri, Apr 24, 2020 at 10:24 AM Xintong Song <[hidden email]> wrote:
I might be wrong about how JNI works. Isn't a native method always executed in another process?

I was searching for the java error message "Cannot allocate memory", and it seems this happens when JVM cannot allocate memory from the OS. Given the exception is thrown from calling a native method, I think the problem is that not enough native memory can be allocated for executing the native method.

Thank you~

Xintong Song



On Fri, Apr 24, 2020 at 3:40 PM Stephan Ewen <[hidden email]> wrote:
@Xintong - out of curiosity, where do you see that this tries to fork a process? I must be overlooking something, I could only see the native method call.

On Fri, Apr 24, 2020 at 4:53 AM Xintong Song <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Zahid Rahman
In reply to this post by Xintong Song
https://youtu.be/UEkjRN8jRx4  22:10


-  one  option is to reduce flink managed memory from default  70% to may be 50%.

 - This error could be caused also  due to missing memory ;

- maintaining a local list by programmer so over using user allocated memory caused by heavy processing ;

 - or using a small jvm ,

- Or JVM  spends too much time on gc.

Out of memory has nothing to do flink or flink is not at fault.


This process is known as "pimping" flink.

also part of pimping is use to use local disk for memory spill.

On Fri, 24 Apr 2020, 03:53 Xintong Song, <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Stephan Ewen
@zahid I would kindly ask you to rethink you approach to posting here. 
Wanting to help answering questions is appreciated, but what you post is always completely disconnected from the actual issue.

The questions here usually go beyond the obvious and beyond what a simple Stack Overflow search yields. That's why they are posted here and the person asking is not simply searching Stack Overflow themselves.

So please either make the effort to really dig into the problem and try to understand what is the specific issue, rather than posting unrelated stack overflow links. If you don't want to do that, please stop chiming in.


On Fri, Apr 24, 2020 at 1:15 PM Zahid Rahman <[hidden email]> wrote:
https://youtu.be/UEkjRN8jRx4  22:10


-  one  option is to reduce flink managed memory from default  70% to may be 50%.

 - This error could be caused also  due to missing memory ;

- maintaining a local list by programmer so over using user allocated memory caused by heavy processing ;

 - or using a small jvm ,

- Or JVM  spends too much time on gc.

Out of memory has nothing to do flink or flink is not at fault.


This process is known as "pimping" flink.

also part of pimping is use to use local disk for memory spill.

On Fri, 24 Apr 2020, 03:53 Xintong Song, <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Zahid Rahman
> " a simple Stack Overflow search yields. 
"
Actually it wasn't stack overflow but  a video I saw presented by Robert Metzger. of Apache Flink org.

Your mind  must have been fogged up 
with another thought of another email not the contents of my email clearly.

He explained  the very many solutions to the out of memory error.

Obviously I cant dig any deeper unless I have the code in front of me loaded into an IDE. 

for example I came across flink archetypes from Alibaba etc. in Eclipse.

I got  every conceivable possible error I have never seen before.

I used google and StackOverFlow  to solve each error. 

It took me about 6 hours but I finally have those archetypes working now.

Also I noticed flakey behaviour from IntelliJ when using flink examples provided from git hub.

so I loaded same flink examples into Eclipse and and NetBeans  saw same flakey behaviour was not present.

I concluded that flakey behaviour was due to intelliJ so I am continuing to spend time on Flink and haven't deleted it yet. 

I can replicate the IntelliJ  flakey behaviour for the right price.


That is software development as I understand it.

Obviously you have different views that you can debug using emailing list. 

Unlike you that  skill of software debugging by email I do not have so I will not  "chime" any more. Nor can I read the mind of another on what is his skill level or product framework familiarity. 

You can have all the glory of chiming.

But do keep in mind it was a YouTube video and not  StackOverFlow which is mostly a text based website where other people  who are self reliant use it to address buggy software. 

I am quite happy to use the crying pains of others before me on stack overflow to resolve the same software bugs.

It is my view that stack overflow is a partner program with Apache frameworks . How did we develop software before google or StackOverFlow  or mailing lists ? 

I would have to say it was with comprehensive product documents and makeuse.org of  software development tools. Mainly an understanding that software development is tight binding of teaceable logic flow. 

Absolutely no magic except in the case of intermittent error may be. 

That was aong winded personal attack so this is a long winded  explanation. 


I too am a member of a soon to be extinct tribe , can I be apache too  ?

Happy  Hunting of Apaches  :).
















On Fri, 24 Apr 2020, 13:54 Stephan Ewen, <[hidden email]> wrote:
@zahid I would kindly ask you to rethink you approach to posting here. 
Wanting to help answering questions is appreciated, but what you post is always completely disconnected from the actual issue.

The questions here usually go beyond the obvious and beyond what a simple Stack Overflow search yields. That's why they are posted here and the person asking is not simply searching Stack Overflow themselves.

So please either make the effort to really dig into the problem and try to understand what is the specific issue, rather than posting unrelated stack overflow links. If you don't want to do that, please stop chiming in.


On Fri, Apr 24, 2020 at 1:15 PM Zahid Rahman <[hidden email]> wrote:
https://youtu.be/UEkjRN8jRx4  22:10


-  one  option is to reduce flink managed memory from default  70% to may be 50%.

 - This error could be caused also  due to missing memory ;

- maintaining a local list by programmer so over using user allocated memory caused by heavy processing ;

 - or using a small jvm ,

- Or JVM  spends too much time on gc.

Out of memory has nothing to do flink or flink is not at fault.


This process is known as "pimping" flink.

also part of pimping is use to use local disk for memory spill.

On Fri, 24 Apr 2020, 03:53 Xintong Song, <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Som Lima
@Zahir

what the Apache means is don't  be like Jesse Anderson who recommended Flink on the basis Apache only uses  maps as seen in video.

While Flink uses ValueState and State in Streaming API. 

Although it appears Jesse Anderson only looked as deep as the data stream helloworld.
You are required to think and look deeper.


Watch "Airbus makes more of the sky with Flink - Jesse Anderson & Hassene Ben Salem" on YouTube





On Fri, 24 Apr 2020, 17:38 Zahid Rahman, <[hidden email]> wrote:
> " a simple Stack Overflow search yields. 
"
Actually it wasn't stack overflow but  a video I saw presented by Robert Metzger. of Apache Flink org.

Your mind  must have been fogged up 
with another thought of another email not the contents of my email clearly.

He explained  the very many solutions to the out of memory error.

Obviously I cant dig any deeper unless I have the code in front of me loaded into an IDE. 

for example I came across flink archetypes from Alibaba etc. in Eclipse.

I got  every conceivable possible error I have never seen before.

I used google and StackOverFlow  to solve each error. 

It took me about 6 hours but I finally have those archetypes working now.

Also I noticed flakey behaviour from IntelliJ when using flink examples provided from git hub.

so I loaded same flink examples into Eclipse and and NetBeans  saw same flakey behaviour was not present.

I concluded that flakey behaviour was due to intelliJ so I am continuing to spend time on Flink and haven't deleted it yet. 

I can replicate the IntelliJ  flakey behaviour for the right price.


That is software development as I understand it.

Obviously you have different views that you can debug using emailing list. 

Unlike you that  skill of software debugging by email I do not have so I will not  "chime" any more. Nor can I read the mind of another on what is his skill level or product framework familiarity. 

You can have all the glory of chiming.

But do keep in mind it was a YouTube video and not  StackOverFlow which is mostly a text based website where other people  who are self reliant use it to address buggy software. 

I am quite happy to use the crying pains of others before me on stack overflow to resolve the same software bugs.

It is my view that stack overflow is a partner program with Apache frameworks . How did we develop software before google or StackOverFlow  or mailing lists ? 

I would have to say it was with comprehensive product documents and makeuse.org of  software development tools. Mainly an understanding that software development is tight binding of teaceable logic flow. 

Absolutely no magic except in the case of intermittent error may be. 

That was aong winded personal attack so this is a long winded  explanation. 


I too am a member of a soon to be extinct tribe , can I be apache too  ?

Happy  Hunting of Apaches  :).
















On Fri, 24 Apr 2020, 13:54 Stephan Ewen, <[hidden email]> wrote:
@zahid I would kindly ask you to rethink you approach to posting here. 
Wanting to help answering questions is appreciated, but what you post is always completely disconnected from the actual issue.

The questions here usually go beyond the obvious and beyond what a simple Stack Overflow search yields. That's why they are posted here and the person asking is not simply searching Stack Overflow themselves.

So please either make the effort to really dig into the problem and try to understand what is the specific issue, rather than posting unrelated stack overflow links. If you don't want to do that, please stop chiming in.


On Fri, Apr 24, 2020 at 1:15 PM Zahid Rahman <[hidden email]> wrote:
https://youtu.be/UEkjRN8jRx4  22:10


-  one  option is to reduce flink managed memory from default  70% to may be 50%.

 - This error could be caused also  due to missing memory ;

- maintaining a local list by programmer so over using user allocated memory caused by heavy processing ;

 - or using a small jvm ,

- Or JVM  spends too much time on gc.

Out of memory has nothing to do flink or flink is not at fault.


This process is known as "pimping" flink.

also part of pimping is use to use local disk for memory spill.

On Fri, 24 Apr 2020, 03:53 Xintong Song, <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 Out of memory

Stephan Ewen
Okay, I think we need to close this thread.

@Lasse Nedergaard - I hope your original question was answered by increase in JVM overhead. If not, please post again (cc me) and we will re-boot the discussion.

@Som Lima - It is not nice to say bad things about other people here on the mailing list, please refrain from doing that. We want to create a positive environment here were we support each other and focus on technical issues. I also don't agree with the assessment of Jesse Anderson; one video does not portrait the full situation.

@Zahid Raman - My post was not meant as a personal attack; if it came across as such that was not the intention. Your response wasn't very understandable, but had a bunch of sentences with a very strange sentiment. From a quick search it seems that you had similar issues in other communities already, like Maven, Hadoop, Spark.
So let me restate this: We strive to create a friendly, helpful, professional community here, and this should be reflected in the tone and approach on this mailing list. If you can work with that, then this community is open to you, otherwise not.

Best,
Stephan


On Sat, Apr 25, 2020 at 12:07 AM Som Lima <[hidden email]> wrote:
@Zahir

what the Apache means is don't  be like Jesse Anderson who recommended Flink on the basis Apache only uses  maps as seen in video.

While Flink uses ValueState and State in Streaming API. 

Although it appears Jesse Anderson only looked as deep as the data stream helloworld.
You are required to think and look deeper.


Watch "Airbus makes more of the sky with Flink - Jesse Anderson & Hassene Ben Salem" on YouTube





On Fri, 24 Apr 2020, 17:38 Zahid Rahman, <[hidden email]> wrote:
> " a simple Stack Overflow search yields. 
"
Actually it wasn't stack overflow but  a video I saw presented by Robert Metzger. of Apache Flink org.

Your mind  must have been fogged up 
with another thought of another email not the contents of my email clearly.

He explained  the very many solutions to the out of memory error.

Obviously I cant dig any deeper unless I have the code in front of me loaded into an IDE. 

for example I came across flink archetypes from Alibaba etc. in Eclipse.

I got  every conceivable possible error I have never seen before.

I used google and StackOverFlow  to solve each error. 

It took me about 6 hours but I finally have those archetypes working now.

Also I noticed flakey behaviour from IntelliJ when using flink examples provided from git hub.

so I loaded same flink examples into Eclipse and and NetBeans  saw same flakey behaviour was not present.

I concluded that flakey behaviour was due to intelliJ so I am continuing to spend time on Flink and haven't deleted it yet. 

I can replicate the IntelliJ  flakey behaviour for the right price.


That is software development as I understand it.

Obviously you have different views that you can debug using emailing list. 

Unlike you that  skill of software debugging by email I do not have so I will not  "chime" any more. Nor can I read the mind of another on what is his skill level or product framework familiarity. 

You can have all the glory of chiming.

But do keep in mind it was a YouTube video and not  StackOverFlow which is mostly a text based website where other people  who are self reliant use it to address buggy software. 

I am quite happy to use the crying pains of others before me on stack overflow to resolve the same software bugs.

It is my view that stack overflow is a partner program with Apache frameworks . How did we develop software before google or StackOverFlow  or mailing lists ? 

I would have to say it was with comprehensive product documents and makeuse.org of  software development tools. Mainly an understanding that software development is tight binding of teaceable logic flow. 

Absolutely no magic except in the case of intermittent error may be. 

That was aong winded personal attack so this is a long winded  explanation. 


I too am a member of a soon to be extinct tribe , can I be apache too  ?

Happy  Hunting of Apaches  :).
















On Fri, 24 Apr 2020, 13:54 Stephan Ewen, <[hidden email]> wrote:
@zahid I would kindly ask you to rethink you approach to posting here. 
Wanting to help answering questions is appreciated, but what you post is always completely disconnected from the actual issue.

The questions here usually go beyond the obvious and beyond what a simple Stack Overflow search yields. That's why they are posted here and the person asking is not simply searching Stack Overflow themselves.

So please either make the effort to really dig into the problem and try to understand what is the specific issue, rather than posting unrelated stack overflow links. If you don't want to do that, please stop chiming in.


On Fri, Apr 24, 2020 at 1:15 PM Zahid Rahman <[hidden email]> wrote:
https://youtu.be/UEkjRN8jRx4  22:10


-  one  option is to reduce flink managed memory from default  70% to may be 50%.

 - This error could be caused also  due to missing memory ;

- maintaining a local list by programmer so over using user allocated memory caused by heavy processing ;

 - or using a small jvm ,

- Or JVM  spends too much time on gc.

Out of memory has nothing to do flink or flink is not at fault.


This process is known as "pimping" flink.

also part of pimping is use to use local disk for memory spill.

On Fri, 24 Apr 2020, 03:53 Xintong Song, <[hidden email]> wrote:
@Stephan,
I don't think so. If JVM hits the direct memory limit, you should see the error message "OutOfMemoryError: Direct buffer memory".

Thank you~

Xintong Song



On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen <[hidden email]> wrote:
@Xintong and @Lasse could it be that the JVM hits the "Direct Memory" limit here?
Would increasing the "taskmanager.memory.framework.off-heap.size" help?

On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman <[hidden email]> wrote:
As you can see from the task manager tab of flink web dashboard 

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

Flink is only using 128M MB which can easily cause OOM
error.

These are DEFAULT settings.

I dusted off an old laptop so it only 3.8 GB RAM.

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song, <[hidden email]> wrote:
Hi Lasse,

From what I understand, your problem is that JVM tries to fork some native process (if you look at the exception stack the root exception is thrown from a native method) but there's no enough memory for doing that. This could happen when either Mesos is using cgroup strict mode for memory control, or there's no more memory on the machine. Flink cannot prevent native processes from using more memory. It can only reserve certain amount of memory for such native usage when requesting worker memory from the deployment environment (in your case Mesos) and allocating Java heap / direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can leverage the configuration options 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the documentation[1].

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman <[hidden email]> wrote:

On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <[hidden email]> wrote:
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. 

We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. 
We have assigned 2.2 GB/task manager and configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard 


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at java.io.FilterOutputStream.write(FilterOutputStream.java:77) at java.io.FilterOutputStream.write(FilterOutputStream.java:125) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)