Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

杨力
I used to runFlink SQL in streaming mode with more than 70 sqls in version 1.4. With so many sqls loaded, akka.framesize has to be set to 200 MB to submit the job.

When I am trying to run the job with flink 1.6.0, the HTTP-based job submission works perfectly but an OutOfMemoryError is thrown when tasks are being depolyed.

java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
        at org.apache.flink.runtime.blob.BlobWriter.serializeAndTryOffload(BlobWriter.java:99)
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getTaskInformationOrBlobKey(ExecutionJobVertex.java:393)
        at org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:827)
        at org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:580)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:963)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph$$Lambda$105/800937955.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:541)
        at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture$$Lambda$92/1432873073.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
        at akka.dispatch.OnComplete.internal(Future.scala:259)
        at akka.dispatch.OnComplete.internal(Future.scala:256)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

This OOM error raises even with a 12GB heap. I have dived into source code, only found that ExecutionJobVertex.getTaskInformationOrBlobKey is serializing a TaskInformation object, which seems not to be a large one. Can anyone help me to fix or work around the problem?
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

Hequn Cheng
Hi,

Have you ever increased the memory of job master? 
If you run a flink job on yarn, you can increase job master's memory by "-yjm 1024m"[1]. 

Best, Hequn


On Mon, Aug 13, 2018 at 10:25 PM, 杨力 <[hidden email]> wrote:
I used to runFlink SQL in streaming mode with more than 70 sqls in version 1.4. With so many sqls loaded, akka.framesize has to be set to 200 MB to submit the job.

When I am trying to run the job with flink 1.6.0, the HTTP-based job submission works perfectly but an OutOfMemoryError is thrown when tasks are being depolyed.

java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
        at org.apache.flink.runtime.blob.BlobWriter.serializeAndTryOffload(BlobWriter.java:99)
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getTaskInformationOrBlobKey(ExecutionJobVertex.java:393)
        at org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:827)
        at org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:580)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:963)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph$$Lambda$105/800937955.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:541)
        at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture$$Lambda$92/1432873073.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
        at akka.dispatch.OnComplete.internal(Future.scala:259)
        at akka.dispatch.OnComplete.internal(Future.scala:256)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

This OOM error raises even with a 12GB heap. I have dived into source code, only found that ExecutionJobVertex.getTaskInformationOrBlobKey is serializing a TaskInformation object, which seems not to be a large one. Can anyone help me to fix or work around the problem?

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

杨力
Thanks for the tip! It works.

I forgot the job manager.

Hequn Cheng <[hidden email]> 于 2018年8月14日周二 上午9:15写道:
Hi,

Have you ever increased the memory of job master? 
If you run a flink job on yarn, you can increase job master's memory by "-yjm 1024m"[1]. 

Best, Hequn


On Mon, Aug 13, 2018 at 10:25 PM, 杨力 <[hidden email]> wrote:
I used to runFlink SQL in streaming mode with more than 70 sqls in version 1.4. With so many sqls loaded, akka.framesize has to be set to 200 MB to submit the job.

When I am trying to run the job with flink 1.6.0, the HTTP-based job submission works perfectly but an OutOfMemoryError is thrown when tasks are being depolyed.

java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
        at org.apache.flink.runtime.blob.BlobWriter.serializeAndTryOffload(BlobWriter.java:99)
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getTaskInformationOrBlobKey(ExecutionJobVertex.java:393)
        at org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:827)
        at org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:580)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:963)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph$$Lambda$105/800937955.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:541)
        at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture$$Lambda$92/1432873073.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
        at akka.dispatch.OnComplete.internal(Future.scala:259)
        at akka.dispatch.OnComplete.internal(Future.scala:256)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

This OOM error raises even with a 12GB heap. I have dived into source code, only found that ExecutionJobVertex.getTaskInformationOrBlobKey is serializing a TaskInformation object, which seems not to be a large one. Can anyone help me to fix or work around the problem?