flink akka OversizedPayloadException error

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink akka OversizedPayloadException error

Ali, Kasif

Hello,

 

We have added a serializer code which register all the schemas with executionEnvironment and  is called before execute(). This is to make sure that all the avro schemas are pre-registered and cached in executors before actual execution begin.  Now while submitting the job we are getting below error.

executionEnvironment.addDefaultKryoSerializer(myKryo.class, largeSerializerObject);

 

 

Error :

13223 [flink-akka.actor.default-dispatcher-5] ERROR akka.remote.EndpointWriter - Transient association error (association remains live)

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://[hidden email]:54840/user/jobmanager#1440921956]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 45388747 bytes.

 

We followed the documentation to increase akka.framesize to 200mb while starting yarn cluster but it still fails with above error.

 

akka.framesize: Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier (DEFAULT: 10485760b).

 

 

command to start yarn cluister:

./yarn-session.sh -n 15 -tm 24576 -jm 6144 -s 7 -Djobmanager.web.history=100 -Dakka.ask.timeout=10min -Dtaskmanager.network.numberOfBuffers=8000 -Dakka.framesize=209715200b

 

 

We are using flink 1.2.0 version.

 

 

Can you suggest if this is correct config to increase akka frame size? Or Are we using correct config to fix the issue?

 

Thanks,

Kasif

 

Reply | Threaded
Open this post in threaded view
|

Re: flink akka OversizedPayloadException error

Till Rohrmann

Hi Kasif,

using the akka.framesize configuration option is the right way to solve the problem with large akka messages. I’ve tested what you’ve described with the latest 1.2 release branch and it worked for me.

In order to track down your problem I need some more information. First of all you can check in the web UI under “Job Manager” → “Configuration” whether the JobManager has been started with the right parameter value. I assume that this is the case.

Next I’d like to know how you submit your job to the Flink Yarn cluster. If you don’t submit the job from the node where you’ve started the Yarn cluster, then the client won’t find the /tmp/.yarn-properties-hadoop file. This file contains the dynamic properties you’ve configured the cluster with (framesize). Thus, it will only load the normal flink-conf.yaml file and load the default value for the akka framesize (10 MB). In order to change this, you would have to insert the configuration value akka.framesize: 209715200b into the flink-conf.yaml file so that the client also knows how big the framesize should be.

I hope this solves your problem. If not, then it would be great if you could share the logs with me.

Cheers,
Till


On Wed, Mar 15, 2017 at 2:26 PM, Ali, Kasif <[hidden email]> wrote:

Hello,

 

We have added a serializer code which register all the schemas with executionEnvironment and  is called before execute(). This is to make sure that all the avro schemas are pre-registered and cached in executors before actual execution begin.  Now while submitting the job we are getting below error.

executionEnvironment.addDefaultKryoSerializer(myKryo.class, largeSerializerObject);

 

 

Error :

13223 [flink-akka.actor.default-dispatcher-5] ERROR akka.remote.EndpointWriter - Transient association error (association remains live)

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@d191303-012.dc.gs.com:54840/user/jobmanager#1440921956]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 45388747 bytes.

 

We followed the documentation to increase akka.framesize to 200mb while starting yarn cluster but it still fails with above error.

 

akka.framesize: Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier (DEFAULT: 10485760b).

 

 

command to start yarn cluister:

./yarn-session.sh -n 15 -tm 24576 -jm 6144 -s 7 -Djobmanager.web.history=100 -Dakka.ask.timeout=10min -Dtaskmanager.network.numberOfBuffers=8000 -Dakka.framesize=209715200b

 

 

We are using flink 1.2.0 version.

 

 

Can you suggest if this is correct config to increase akka frame size? Or Are we using correct config to fix the issue?

 

Thanks,

Kasif

 


Reply | Threaded
Open this post in threaded view
|

RE: flink akka OversizedPayloadException error

Ali, Kasif

Thanks Till

 

It worked after updating the property in flink-conf.yaml file.

 

Thanks,

Kasif

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: Wednesday, March 15, 2017 9:08 PM
To: [hidden email]
Subject: Re: flink akka OversizedPayloadException error

 

Hi Kasif,

using the akka.framesize configuration option is the right way to solve the problem with large akka messages. I’ve tested what you’ve described with the latest 1.2 release branch and it worked for me.

In order to track down your problem I need some more information. First of all you can check in the web UI under “Job Manager” → “Configuration” whether the JobManager has been started with the right parameter value. I assume that this is the case.

Next I’d like to know how you submit your job to the Flink Yarn cluster. If you don’t submit the job from the node where you’ve started the Yarn cluster, then the client won’t find the /tmp/.yarn-properties-hadoop file. This file contains the dynamic properties you’ve configured the cluster with (framesize). Thus, it will only load the normal flink-conf.yaml file and load the default value for the akka framesize (10 MB). In order to change this, you would have to insert the configuration value akka.framesize: 209715200b into the flink-conf.yaml file so that the client also knows how big the framesize should be.

I hope this solves your problem. If not, then it would be great if you could share the logs with me.

Cheers,
Till

 

On Wed, Mar 15, 2017 at 2:26 PM, Ali, Kasif <[hidden email]> wrote:

Hello,

 

We have added a serializer code which register all the schemas with executionEnvironment and  is called before execute(). This is to make sure that all the avro schemas are pre-registered and cached in executors before actual execution begin.  Now while submitting the job we are getting below error.

executionEnvironment.addDefaultKryoSerializer(myKryo.class, largeSerializerObject);

 

 

Error :

13223 [flink-akka.actor.default-dispatcher-5] ERROR akka.remote.EndpointWriter - Transient association error (association remains live)

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@...:54840/user/jobmanager#1440921956]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 45388747 bytes.

 

We followed the documentation to increase akka.framesize to 200mb while starting yarn cluster but it still fails with above error.

 

akka.framesize: Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier (DEFAULT: 10485760b).

 

 

command to start yarn cluister:

./yarn-session.sh -n 15 -tm 24576 -jm 6144 -s 7 -Djobmanager.web.history=100 -Dakka.ask.timeout=10min -Dtaskmanager.network.numberOfBuffers=8000 -Dakka.framesize=209715200b

 

 

We are using flink 1.2.0 version.

 

 

Can you suggest if this is correct config to increase akka frame size? Or Are we using correct config to fix the issue?

 

Thanks,

Kasif