Hi.
We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction.
We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below.
We have assigned 2.2 GB/task manager and configured taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better job, but it still not working and the task manager is leaking mem for each OOM and finial kill by Mesos
Any idea what we can do to figure out what settings we need to change?
Thanks in advance
Lasse Nedergaard
WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
java.io.IOException: Cannot allocate memory
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277)
at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c.
org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 3509 for operator Feature extraction (8/12).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot allocate memory
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Cannot allocate memory
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95)
at java.io.FilterOutputStream.write(FilterOutputStream.java:77)
at java.io.FilterOutputStream.write(FilterOutputStream.java:125)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47)
at org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92)
at org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113)
at org.apache.avro.io.Encoder.writeString(Encoder.java:130)
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323)
at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185)
at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)