Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

jelmer
HI, 

We recently upgraded our test environment to from flink 1.3.2 to flink 1.4.0.

We are using a high availability setup on the job manager. And now often when I go to the job details in the web ui the call will timeout and the following error will pop up in the job manager log


akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using serializer [class akka.serialization.JavaSerializer].
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:502) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) [flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:495) [flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) [flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.4.0.jar:1.4.0]
Caused by: java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[na:1.8.0_131]
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[na:1.8.0_131]
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47) ~[flink-dist_2.11-1.4.0.jar:1.4.0]
... 17 common frames omitted



I isolated it further, and it seems to be triggered by this call


I cannot reproduce it on my local lapop without HA setup.
Before I dig any deeper, has anyone already come across this ?
Reply | Threaded
Open this post in threaded view
|

Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

Nico Kruber
IMHO, this looks like a bug and it makes sense that you only see this
with an HA setup:

The JobFound message contains the ExecutionGraph which, however, does
not implement the Serializable interface. Without HA, when browsing the
web interface, this message is (probably) not serialized since it is
only served to you via HTML. For HA, this may come from another
JobManager than the Web interface you are browsing.
I'm including Till (cc'd) as he might know more.


Nico

On 16/01/18 09:22, jelmer wrote:

> HI, 
>
> We recently upgraded our test environment to from flink 1.3.2 to flink
> 1.4.0.
>
> We are using a high availability setup on the job manager. And now often
> when I go to the job details in the web ui the call will timeout and the
> following error will pop up in the job manager log
>
>
> akka.remote.MessageSerializer$SerializationException: Failed to
> serialize remote message [class
> org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
> serializer [class akka.serialization.JavaSerializer].
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[na:1.8.0_131]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> ... 17 common frames omitted
>
>
>
> I isolated it further, and it seems to be triggered by this call
>
> https://hostname/jobs/28076fffbcf7eab3f17900a54cc7c41d
>
> I cannot reproduce it on my local lapop without HA setup.
> Before I dig any deeper, has anyone already come across this ?


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

Till Rohrmann
Hi,

this indeed indicates that a REST handler is requesting the ExecutionGraph from a JobManager which does not run in the same ActorSystem. Could you please tell us the exact HA setup. Are your running Flink on Yarn with HA or do you use standalone HA with standby JobManagers?

It would be really helpful if you could also share the logs with us.

Cheers,
Till

On Tue, Jan 16, 2018 at 10:20 AM, Nico Kruber <[hidden email]> wrote:
IMHO, this looks like a bug and it makes sense that you only see this
with an HA setup:

The JobFound message contains the ExecutionGraph which, however, does
not implement the Serializable interface. Without HA, when browsing the
web interface, this message is (probably) not serialized since it is
only served to you via HTML. For HA, this may come from another
JobManager than the Web interface you are browsing.
I'm including Till (cc'd) as he might know more.


Nico

On 16/01/18 09:22, jelmer wrote:
> HI, 
>
> We recently upgraded our test environment to from flink 1.3.2 to flink
> 1.4.0.
>
> We are using a high availability setup on the job manager. And now often
> when I go to the job details in the web ui the call will timeout and the
> following error will pop up in the job manager log
>
>
> akka.remote.MessageSerializer$SerializationException: Failed to
> serialize remote message [class
> org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
> serializer [class akka.serialization.JavaSerializer].
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[na:1.8.0_131]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> ... 17 common frames omitted
>
>
>
> I isolated it further, and it seems to be triggered by this call
>
> https://hostname/jobs/28076fffbcf7eab3f17900a54cc7c41d
>
> I cannot reproduce it on my local lapop without HA setup.
> Before I dig any deeper, has anyone already come across this ?


Reply | Threaded
Open this post in threaded view
|

Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

jelmer
I think i found the issue. I'd just like to verify that my reasoning is correct

We had the following keys in our flink-conf.yaml

jobmanager.web.address: localhost
jobmanager.web.port: 8081

This worked on flink 1.3.2 

But on flink 1.4.0 this check 


Will make make it so that both master and standby think that they don't need to perform a redirect. Which means that the standby node will serve web traffic.

I am assuming that it is intended that this never happens. (because if will call remote actor systems) so this class not being serializable is not a bug





On 16 January 2018 at 14:51, Till Rohrmann <[hidden email]> wrote:
Hi,

this indeed indicates that a REST handler is requesting the ExecutionGraph from a JobManager which does not run in the same ActorSystem. Could you please tell us the exact HA setup. Are your running Flink on Yarn with HA or do you use standalone HA with standby JobManagers?

It would be really helpful if you could also share the logs with us.

Cheers,
Till

On Tue, Jan 16, 2018 at 10:20 AM, Nico Kruber <[hidden email]> wrote:
IMHO, this looks like a bug and it makes sense that you only see this
with an HA setup:

The JobFound message contains the ExecutionGraph which, however, does
not implement the Serializable interface. Without HA, when browsing the
web interface, this message is (probably) not serialized since it is
only served to you via HTML. For HA, this may come from another
JobManager than the Web interface you are browsing.
I'm including Till (cc'd) as he might know more.


Nico

On 16/01/18 09:22, jelmer wrote:
> HI, 
>
> We recently upgraded our test environment to from flink 1.3.2 to flink
> 1.4.0.
>
> We are using a high availability setup on the job manager. And now often
> when I go to the job details in the web ui the call will timeout and the
> following error will pop up in the job manager log
>
>
> akka.remote.MessageSerializer$SerializationException: Failed to
> serialize remote message [class
> org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
> serializer [class akka.serialization.JavaSerializer].
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[na:1.8.0_131]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> ... 17 common frames omitted
>
>
>
> I isolated it further, and it seems to be triggered by this call
>
> https://hostname/jobs/28076fffbcf7eab3f17900a54cc7c41d
>
> I cannot reproduce it on my local lapop without HA setup.
> Before I dig any deeper, has anyone already come across this ?



Reply | Threaded
Open this post in threaded view
|

Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

Till Rohrmann
Hi,

yes you're right. The different standby JobManagers should have different web addresses.

Cheers,
Till

On Tue, Jan 16, 2018 at 6:32 PM, jelmer <[hidden email]> wrote:
I think i found the issue. I'd just like to verify that my reasoning is correct

We had the following keys in our flink-conf.yaml

jobmanager.web.address: localhost
jobmanager.web.port: 8081

This worked on flink 1.3.2 

But on flink 1.4.0 this check 


Will make make it so that both master and standby think that they don't need to perform a redirect. Which means that the standby node will serve web traffic.

I am assuming that it is intended that this never happens. (because if will call remote actor systems) so this class not being serializable is not a bug





On 16 January 2018 at 14:51, Till Rohrmann <[hidden email]> wrote:
Hi,

this indeed indicates that a REST handler is requesting the ExecutionGraph from a JobManager which does not run in the same ActorSystem. Could you please tell us the exact HA setup. Are your running Flink on Yarn with HA or do you use standalone HA with standby JobManagers?

It would be really helpful if you could also share the logs with us.

Cheers,
Till

On Tue, Jan 16, 2018 at 10:20 AM, Nico Kruber <[hidden email]> wrote:
IMHO, this looks like a bug and it makes sense that you only see this
with an HA setup:

The JobFound message contains the ExecutionGraph which, however, does
not implement the Serializable interface. Without HA, when browsing the
web interface, this message is (probably) not serialized since it is
only served to you via HTML. For HA, this may come from another
JobManager than the Web interface you are browsing.
I'm including Till (cc'd) as he might know more.


Nico

On 16/01/18 09:22, jelmer wrote:
> HI, 
>
> We recently upgraded our test environment to from flink 1.3.2 to flink
> 1.4.0.
>
> We are using a high availability setup on the job manager. And now often
> when I go to the job details in the web ui the call will timeout and the
> following error will pop up in the job manager log
>
>
> akka.remote.MessageSerializer$SerializationException: Failed to
> serialize remote message [class
> org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
> serializer [class akka.serialization.JavaSerializer].
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[na:1.8.0_131]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> ... 17 common frames omitted
>
>
>
> I isolated it further, and it seems to be triggered by this call
>
> https://hostname/jobs/28076fffbcf7eab3f17900a54cc7c41d
>
> I cannot reproduce it on my local lapop without HA setup.
> Before I dig any deeper, has anyone already come across this ?