memory tuning

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

memory tuning

Marco Villalobos-2

I have a flink job that collects and aggregates time-series data from many devices into one object (let's call that X) that was collected by a window.

X contains time-series data, so it contains many String, Instant, a HashMap, and another type (Let's call Y) objects.

When I collect 4 X instances, and it contains 800000 Y instances, that equates to approximately 172 MB of data.

That should be okay, because my machine has 32 GB ram, and I allocated 1.5 GB to each task manager.

However, it fails due to out of memory errors, and I think it happens during serialization. I am not sure if that's a coincidence or fact.

I am using RocksDB state backend, as well Kryo serialization.  

I am already refactoring my code from Processing Time semantics to Event Time semantics, and I am trying to store smaller sized objects in keyed state, rather than this large object, but in the meantime, our machines have plenty of memory. What can I do to fix this?

SAMPLE STACK TRACE

2020-12-17 02:45:55,524 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
        at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.OutOfMemoryError: GC overhead limit exceeded
        ... 12 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
2020-12-17 02:45:55,554 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Triggering cancellation of task code enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d).
2020-12-17 02:45:38,981 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - aggregate daily average window function (2/2) (745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_252]
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?]
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[965ae4d.jar:?]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[965ae4d.jar:?]
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.getInternal(AbstractRocksDBAppendingState.java:64) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:101) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$539/1319281920.runDefaultAction(Unknown Source) ~[?:?]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]



Reply | Threaded
Open this post in threaded view
|

Re: memory tuning

Matthias
Hi Marco,
Could you share the preconfiguration logs? They are printed in the beginning of the taskmanagers' logs and contain a summary of the used memory configuration?

Best,
Matthias

On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <[hidden email]> wrote:

I have a flink job that collects and aggregates time-series data from many devices into one object (let's call that X) that was collected by a window.

X contains time-series data, so it contains many String, Instant, a HashMap, and another type (Let's call Y) objects.

When I collect 4 X instances, and it contains 800000 Y instances, that equates to approximately 172 MB of data.

That should be okay, because my machine has 32 GB ram, and I allocated 1.5 GB to each task manager.

However, it fails due to out of memory errors, and I think it happens during serialization. I am not sure if that's a coincidence or fact.

I am using RocksDB state backend, as well Kryo serialization.  

I am already refactoring my code from Processing Time semantics to Event Time semantics, and I am trying to store smaller sized objects in keyed state, rather than this large object, but in the meantime, our machines have plenty of memory. What can I do to fix this?

SAMPLE STACK TRACE

2020-12-17 02:45:55,524 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
        at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.OutOfMemoryError: GC overhead limit exceeded
        ... 12 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
2020-12-17 02:45:55,554 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Triggering cancellation of task code enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d).
2020-12-17 02:45:38,981 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - aggregate daily average window function (2/2) (745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_252]
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?]
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[965ae4d.jar:?]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[965ae4d.jar:?]
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.getInternal(AbstractRocksDBAppendingState.java:64) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:101) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$539/1319281920.runDefaultAction(Unknown Source) ~[?:?]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

Reply | Threaded
Open this post in threaded view
|

Re: memory tuning

Marco Villalobos-2
Yes, I will do that. 

PRODUCTION

2021-01-26 04:03:50,804 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - --------------------------------------------------------------------------------
2021-01-26 04:03:50,807 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Starting YARN TaskExecutor runner (Version: 1.11.0, Scala: 2.12, Rev:d04872d, Date:2020-06-29T16:13:14+02:00)
2021-01-26 04:03:50,807 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  OS current user: yarn
2021-01-26 04:03:50,937 WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Current Hadoop/Kerberos user: hadoop
2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  JVM: OpenJDK 64-Bit Server VM - Amazon.com Inc. - 1.8/25.252-b09
2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Maximum heap size: 3289 MiBytes
2021-01-26 04:03:50,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  JAVA_HOME: /etc/alternatives/jre
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Hadoop version: 3.2.1
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  JVM Options:
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Xmx3597035049
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Xms3597035049
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -XX:MaxDirectMemorySize=880468305
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -XX:MaxMetaspaceSize=268435456
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dlog.file=/var/log/hadoop-yarn/containers/application_1611280261341_0015/container_1611280261341_0015_01_000004/taskmanager.log
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dlog4j.configuration=file:./log4j.properties
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dlog4j.configurationFile=file:./log4j.properties
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Program Arguments:
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.framework.off-heap.size=134217728b
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.network.max=746250577b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.network.min=746250577b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.framework.heap.size=134217728b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.managed.size=2985002310b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.cpu.cores=1.0
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.task.heap.size=3462817321b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.task.off-heap.size=0b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     --configDir
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     .
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Djobmanager.rpc.address=ip-10-42-1-200.us-west-2.compute.internal
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dweb.port=0
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dweb.tmpdir=/tmp/flink-web-af05fbc0-d86d-4c74-9467-e86791a20053
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Djobmanager.rpc.port=36491
2021-01-26 04:03:50,991 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Drest.address=ip-10-42-1-200.us-west-2.compute.internal

// classpath output

2021-01-26 04:03:50,994 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - --------------------------------------------------------------------------------
2021-01-26 04:03:50,995 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - Registered UNIX signal handlers for [TERM, HUP, INT]
2021-01-26 04:03:50,997 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - Current working Directory: /mnt/yarn/usercache/hadoop/appcache/application_1611280261341_0015/container_1611280261341_0015_01_000004
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 8g
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: internal.jobgraph-path, job.graph
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: env.yarn.conf.dir, /etc/hadoop/conf
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.cluster-id, application_1611280261341_0015
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.target, yarn-per-job
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 4g
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.preallocate, false
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.savepoint.ignore-unclaimed-state, false
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.attached, true
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: internal.cluster.execution-mode, NORMAL
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.shutdown-on-attached-exit, false
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: pipeline.jars, file:/home/hadoop/bp-whiting/develop-17e9fd0e.jar
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 2
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1


LOGS FROM LOCAL

2021-01-25 21:41:18,023 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - --------------------------------------------------------------------------------
2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Preconfiguration: 
2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - 


TM_RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
dynamic_configs: -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=13421
7728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b 
logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
INFO  [] - Loading configuration property: jobmanager.rpc.address, localhost
INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123
INFO  [] - Loading configuration property: jobmanager.memory.process.size, 1600m
INFO  [] - Loading configuration property: taskmanager.memory.process.size, 1728m
INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
INFO  [] - Loading configuration property: parallelism.default, 1
INFO  [] - Loading configuration property: jobmanager.execution.failover-strategy, region
INFO  [] - Loading configuration property: s3.endpoint, http://localhost:4566
INFO  [] - Loading configuration property: s3.path.style.access, true
INFO  [] - Loading configuration property: taskmanager.memory.process.size, 4g
INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:          4.000gb (4294967296 bytes)
INFO  [] -     Total Flink Memory:          3.350gb (3597035104 bytes)
INFO  [] -       Total JVM Heap Memory:     1.550gb (1664299798 bytes)
INFO  [] -         Framework:               128.000mb (134217728 bytes)
INFO  [] -         Task:                    1.425gb (1530082070 bytes)
INFO  [] -       Total Off-heap Memory:     1.800gb (1932735306 bytes)
INFO  [] -         Managed:                 1.340gb (1438814063 bytes)
INFO  [] -         Total JVM Direct Memory: 471.040mb (493921243 bytes)
INFO  [] -           Framework:             128.000mb (134217728 bytes)
INFO  [] -           Task:                  0 bytes
INFO  [] -           Network:               343.040mb (359703515 bytes)
INFO  [] -     JVM Metaspace:               256.000mb (268435456 bytes)
INFO  [] -     JVM Overhead:                409.600mb (429496736 bytes)

2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - --------------------------------------------------------------------------------
2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Starting TaskManager (Version: 1.11.0, Scala: 2.12, Rev:d04872d, Date:2020-06-29T16:13:14+02:00)
2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  OS current user: mvillalobos
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  JVM: OpenJDK 64-Bit Server VM - Azul Systems, Inc. - 11/11.0.6+10-LTS
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Maximum heap size: 1588 MiBytes
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  JAVA_HOME: /Users/mvillalobos/.sdkman/candidates/java/current
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  No Hadoop Dependency available
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  JVM Options:
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -XX:+UseG1GC
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Xmx1664299798
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Xms1664299798
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -XX:MaxDirectMemorySize=493921243
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -XX:MaxMetaspaceSize=268435456
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dlog.file=/Users/mvillalobos/dev/flink/flink-1.11.0/log/flink-mvillalobos-taskexecutor-1-mvillalobos.beyond.ai.log
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dlog4j.configuration=file:/Users/mvillalobos/dev/flink/flink-1.11.0/conf/log4j.properties
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dlog4j.configurationFile=file:/Users/mvillalobos/dev/flink/flink-1.11.0/conf/log4j.properties
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dlogback.configurationFile=file:/Users/mvillalobos/dev/flink/flink-1.11.0/conf/logback.xml
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Program Arguments:
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     --configDir
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     /Users/mvillalobos/dev/flink/flink-1.11.0/conf
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.framework.off-heap.size=134217728b
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.network.max=359703515b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.network.min=359703515b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.framework.heap.size=134217728b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.managed.size=1438814063b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.cpu.cores=1.0
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.task.heap.size=1530082070b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.task.off-heap.size=0b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Classpath: /Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-csv-1.11.0.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-json-1.11.0.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-shaded-zookeeper-3.4.14.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-table-blink_2.12-1.11.0.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-table_2.12-1.11.0.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/log4j-1.2-api-2.12.1.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/log4j-api-2.12.1.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/log4j-core-2.12.1.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-dist_2.12-1.11.0.jar:::
2021-01-25 21:41:18,029 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - --------------------------------------------------------------------------------
2021-01-25 21:41:18,030 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Registered UNIX signal handlers for [TERM, HUP, INT]
2021-01-25 21:41:18,034 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Maximum number of open file descriptors is 10240.
2021-01-25 21:41:18,045 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2021-01-25 21:41:18,045 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2021-01-25 21:41:18,047 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3.endpoint, http://localhost:4566
2021-01-25 21:41:18,048 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3.path.style.access, true
2021-01-25 21:41:18,048 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 4g


On Tue, Jan 26, 2021 at 12:34 AM Matthias Pohl <[hidden email]> wrote:
Hi Marco,
Could you share the preconfiguration logs? They are printed in the beginning of the taskmanagers' logs and contain a summary of the used memory configuration?

Best,
Matthias

On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <[hidden email]> wrote:

I have a flink job that collects and aggregates time-series data from many devices into one object (let's call that X) that was collected by a window.

X contains time-series data, so it contains many String, Instant, a HashMap, and another type (Let's call Y) objects.

When I collect 4 X instances, and it contains 800000 Y instances, that equates to approximately 172 MB of data.

That should be okay, because my machine has 32 GB ram, and I allocated 1.5 GB to each task manager.

However, it fails due to out of memory errors, and I think it happens during serialization. I am not sure if that's a coincidence or fact.

I am using RocksDB state backend, as well Kryo serialization.  

I am already refactoring my code from Processing Time semantics to Event Time semantics, and I am trying to store smaller sized objects in keyed state, rather than this large object, but in the meantime, our machines have plenty of memory. What can I do to fix this?

SAMPLE STACK TRACE

2020-12-17 02:45:55,524 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
        at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.OutOfMemoryError: GC overhead limit exceeded
        ... 12 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
2020-12-17 02:45:55,554 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Triggering cancellation of task code enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d).
2020-12-17 02:45:38,981 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - aggregate daily average window function (2/2) (745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_252]
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?]
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[965ae4d.jar:?]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[965ae4d.jar:?]
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.getInternal(AbstractRocksDBAppendingState.java:64) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:101) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$539/1319281920.runDefaultAction(Unknown Source) ~[?:?]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

Reply | Threaded
Open this post in threaded view
|

Re: memory tuning

Matthias
Thanks for sharing the logs. The configuration looks fine. Have you analyzed the memory usage?

On Tue, Jan 26, 2021 at 5:02 PM Marco Villalobos <[hidden email]> wrote:
Yes, I will do that. 

PRODUCTION

2021-01-26 04:03:50,804 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - --------------------------------------------------------------------------------
2021-01-26 04:03:50,807 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Starting YARN TaskExecutor runner (Version: 1.11.0, Scala: 2.12, Rev:d04872d, Date:2020-06-29T16:13:14+02:00)
2021-01-26 04:03:50,807 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  OS current user: yarn
2021-01-26 04:03:50,937 WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Current Hadoop/Kerberos user: hadoop
2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  JVM: OpenJDK 64-Bit Server VM - Amazon.com Inc. - 1.8/25.252-b09
2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Maximum heap size: 3289 MiBytes
2021-01-26 04:03:50,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  JAVA_HOME: /etc/alternatives/jre
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Hadoop version: 3.2.1
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  JVM Options:
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Xmx3597035049
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Xms3597035049
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -XX:MaxDirectMemorySize=880468305
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -XX:MaxMetaspaceSize=268435456
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dlog.file=/var/log/hadoop-yarn/containers/application_1611280261341_0015/container_1611280261341_0015_01_000004/taskmanager.log
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dlog4j.configuration=file:./log4j.properties
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dlog4j.configurationFile=file:./log4j.properties
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Program Arguments:
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.framework.off-heap.size=134217728b
2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.network.max=746250577b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.network.min=746250577b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.framework.heap.size=134217728b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.managed.size=2985002310b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.cpu.cores=1.0
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.task.heap.size=3462817321b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.task.off-heap.size=0b
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     --configDir
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     .
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Djobmanager.rpc.address=ip-10-42-1-200.us-west-2.compute.internal
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dweb.port=0
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dweb.tmpdir=/tmp/flink-web-af05fbc0-d86d-4c74-9467-e86791a20053
2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Djobmanager.rpc.port=36491
2021-01-26 04:03:50,991 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Drest.address=ip-10-42-1-200.us-west-2.compute.internal

// classpath output

2021-01-26 04:03:50,994 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - --------------------------------------------------------------------------------
2021-01-26 04:03:50,995 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - Registered UNIX signal handlers for [TERM, HUP, INT]
2021-01-26 04:03:50,997 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - Current working Directory: /mnt/yarn/usercache/hadoop/appcache/application_1611280261341_0015/container_1611280261341_0015_01_000004
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 8g
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: internal.jobgraph-path, job.graph
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: env.yarn.conf.dir, /etc/hadoop/conf
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2021-01-26 04:03:51,007 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.cluster-id, application_1611280261341_0015
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.target, yarn-per-job
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 4g
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.preallocate, false
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.savepoint.ignore-unclaimed-state, false
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.attached, true
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: internal.cluster.execution-mode, NORMAL
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.shutdown-on-attached-exit, false
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: pipeline.jars, file:/home/hadoop/bp-whiting/develop-17e9fd0e.jar
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 2
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
2021-01-26 04:03:51,008 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1


LOGS FROM LOCAL

2021-01-25 21:41:18,023 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - --------------------------------------------------------------------------------
2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Preconfiguration: 
2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - 


TM_RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
dynamic_configs: -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=13421
7728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b 
logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
INFO  [] - Loading configuration property: jobmanager.rpc.address, localhost
INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123
INFO  [] - Loading configuration property: jobmanager.memory.process.size, 1600m
INFO  [] - Loading configuration property: taskmanager.memory.process.size, 1728m
INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
INFO  [] - Loading configuration property: parallelism.default, 1
INFO  [] - Loading configuration property: jobmanager.execution.failover-strategy, region
INFO  [] - Loading configuration property: s3.endpoint, http://localhost:4566
INFO  [] - Loading configuration property: s3.path.style.access, true
INFO  [] - Loading configuration property: taskmanager.memory.process.size, 4g
INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:          4.000gb (4294967296 bytes)
INFO  [] -     Total Flink Memory:          3.350gb (3597035104 bytes)
INFO  [] -       Total JVM Heap Memory:     1.550gb (1664299798 bytes)
INFO  [] -         Framework:               128.000mb (134217728 bytes)
INFO  [] -         Task:                    1.425gb (1530082070 bytes)
INFO  [] -       Total Off-heap Memory:     1.800gb (1932735306 bytes)
INFO  [] -         Managed:                 1.340gb (1438814063 bytes)
INFO  [] -         Total JVM Direct Memory: 471.040mb (493921243 bytes)
INFO  [] -           Framework:             128.000mb (134217728 bytes)
INFO  [] -           Task:                  0 bytes
INFO  [] -           Network:               343.040mb (359703515 bytes)
INFO  [] -     JVM Metaspace:               256.000mb (268435456 bytes)
INFO  [] -     JVM Overhead:                409.600mb (429496736 bytes)

2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - --------------------------------------------------------------------------------
2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Starting TaskManager (Version: 1.11.0, Scala: 2.12, Rev:d04872d, Date:2020-06-29T16:13:14+02:00)
2021-01-25 21:41:18,025 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  OS current user: mvillalobos
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  JVM: OpenJDK 64-Bit Server VM - Azul Systems, Inc. - 11/11.0.6+10-LTS
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Maximum heap size: 1588 MiBytes
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  JAVA_HOME: /Users/mvillalobos/.sdkman/candidates/java/current
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  No Hadoop Dependency available
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  JVM Options:
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -XX:+UseG1GC
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Xmx1664299798
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Xms1664299798
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -XX:MaxDirectMemorySize=493921243
2021-01-25 21:41:18,026 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -XX:MaxMetaspaceSize=268435456
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dlog.file=/Users/mvillalobos/dev/flink/flink-1.11.0/log/flink-mvillalobos-taskexecutor-1-mvillalobos.beyond.ai.log
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dlog4j.configuration=file:/Users/mvillalobos/dev/flink/flink-1.11.0/conf/log4j.properties
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dlog4j.configurationFile=file:/Users/mvillalobos/dev/flink/flink-1.11.0/conf/log4j.properties
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dlogback.configurationFile=file:/Users/mvillalobos/dev/flink/flink-1.11.0/conf/logback.xml
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Program Arguments:
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     --configDir
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     /Users/mvillalobos/dev/flink/flink-1.11.0/conf
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.framework.off-heap.size=134217728b
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,027 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.network.max=359703515b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.network.min=359703515b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.framework.heap.size=134217728b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.managed.size=1438814063b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.cpu.cores=1.0
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.task.heap.size=1530082070b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -D
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     taskmanager.memory.task.off-heap.size=0b
2021-01-25 21:41:18,028 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Classpath: /Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-csv-1.11.0.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-json-1.11.0.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-shaded-zookeeper-3.4.14.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-table-blink_2.12-1.11.0.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-table_2.12-1.11.0.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/log4j-1.2-api-2.12.1.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/log4j-api-2.12.1.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/log4j-core-2.12.1.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar:/Users/mvillalobos/dev/flink/flink-1.11.0/lib/flink-dist_2.12-1.11.0.jar:::
2021-01-25 21:41:18,029 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - --------------------------------------------------------------------------------
2021-01-25 21:41:18,030 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Registered UNIX signal handlers for [TERM, HUP, INT]
2021-01-25 21:41:18,034 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Maximum number of open file descriptors is 10240.
2021-01-25 21:41:18,045 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2021-01-25 21:41:18,045 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2021-01-25 21:41:18,046 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2021-01-25 21:41:18,047 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3.endpoint, http://localhost:4566
2021-01-25 21:41:18,048 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3.path.style.access, true
2021-01-25 21:41:18,048 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 4g


On Tue, Jan 26, 2021 at 12:34 AM Matthias Pohl <[hidden email]> wrote:
Hi Marco,
Could you share the preconfiguration logs? They are printed in the beginning of the taskmanagers' logs and contain a summary of the used memory configuration?

Best,
Matthias

On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <[hidden email]> wrote:

I have a flink job that collects and aggregates time-series data from many devices into one object (let's call that X) that was collected by a window.

X contains time-series data, so it contains many String, Instant, a HashMap, and another type (Let's call Y) objects.

When I collect 4 X instances, and it contains 800000 Y instances, that equates to approximately 172 MB of data.

That should be okay, because my machine has 32 GB ram, and I allocated 1.5 GB to each task manager.

However, it fails due to out of memory errors, and I think it happens during serialization. I am not sure if that's a coincidence or fact.

I am using RocksDB state backend, as well Kryo serialization.  

I am already refactoring my code from Processing Time semantics to Event Time semantics, and I am trying to store smaller sized objects in keyed state, rather than this large object, but in the meantime, our machines have plenty of memory. What can I do to fix this?

SAMPLE STACK TRACE

2020-12-17 02:45:55,524 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
        at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) [flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.OutOfMemoryError: GC overhead limit exceeded
        ... 12 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
2020-12-17 02:45:55,554 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Triggering cancellation of task code enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d).
2020-12-17 02:45:38,981 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - aggregate daily average window function (2/2) (745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_252]
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?]
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[965ae4d.jar:?]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[965ae4d.jar:?]
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.getInternal(AbstractRocksDBAppendingState.java:64) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:101) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$539/1319281920.runDefaultAction(Unknown Source) ~[?:?]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]