ExecutionGraph not serializable

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

ExecutionGraph not serializable

XiangWei Huang
Hi Flink users,
Flink Jobmanager throw a NotSerializableException when i used JobMasterGateway to get ExecutionGraph of a specific job with 
message RequestJob(jobID). Blow is the detail of Exception:


[ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)
java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
	at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
	at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
	at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So,is it a bug or the way to get job’s executionGraph is invalid.

Best,XiangWei

Reply | Threaded
Open this post in threaded view
|

Re: ExecutionGraph not serializable

Fabian Hueske-2
Hi XiangWei,

I don't think this is a public interface, but Till (in CC) might know better.

Best,
Fabian

2017-11-06 3:27 GMT+01:00 XiangWei Huang <[hidden email]>:
Hi Flink users,
Flink Jobmanager throw a NotSerializableException when i used JobMasterGateway to get ExecutionGraph of a specific job with 
message RequestJob(jobID). Blow is the detail of Exception:


[ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)
java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
	at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
	at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
	at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So,is it a bug or the way to get job’s executionGraph is invalid.

Best,XiangWei


Reply | Threaded
Open this post in threaded view
|

Re: ExecutionGraph not serializable

Till Rohrmann

Hi XiangWei,

how do you use the JobMasterGateway with the actor message RequestJob? The JobMasterGateway is a Java interface and does not represent an ActorCell to which you can send actor messages. Instead you should call JobMasterGateway#requestArchivedExecutionGraph.

Cheers,
Till


On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi XiangWei,

I don't think this is a public interface, but Till (in CC) might know better.

Best,
Fabian

2017-11-06 3:27 GMT+01:00 XiangWei Huang <[hidden email]>:
Hi Flink users,
Flink Jobmanager throw a NotSerializableException when i used JobMasterGateway to get ExecutionGraph of a specific job with 
message RequestJob(jobID). Blow is the detail of Exception:


[ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)
java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
	at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
	at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
	at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So,is it a bug or the way to get job’s executionGraph is invalid.

Best,XiangWei



Reply | Threaded
Open this post in threaded view
|

Re: ExecutionGraph not serializable

XiangWei Huang
hi Till,

   Sorry,I've made a mistake,i used StandaloneClusterClient#getJobManagerGateway  to get  ActorGateway to communicate with JobManager instead of using JobMasterGateway.
Below is the code i executed for getting ExecuteGraph of a Job.


    val flinkConfig = new Configuration()
    val flinkCli = new StandaloneClusterClient(flinkConfig)
    val jobManagerGateWay = flinkCli.getJobManagerGateway
    val jobs = jobManagerGateWay.ask(RequestRunningJobsStatus,new FiniteDuration(10,TimeUnit.SECONDS)).asInstanceOf[Future[RunningJobsStatus]]
    val jobsStatus = Await.result(jobs,new FiniteDuration(10,TimeUnit.SECONDS)).getStatusMessages().asScala.head
    val jobId = jobsStatus.getJobId
    val timeOut = new FiniteDuration(10,TimeUnit.SECONDS)
    val future = jobManagerGateWay.ask(RequestJob(jobId),timeOut)
    val result = Await.result(future,timeOut)
 
JobManager threw NotSerializableException  when i executed this code. So i wonder how is this happened and is there another way to get a job's ExecutionGraph programmatically.

Best,XiangWei

2017-11-07 17:16 GMT+08:00 Till Rohrmann <[hidden email]>:

Hi XiangWei,

how do you use the JobMasterGateway with the actor message RequestJob? The JobMasterGateway is a Java interface and does not represent an ActorCell to which you can send actor messages. Instead you should call JobMasterGateway#requestArchivedExecutionGraph.

Cheers,
Till


On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi XiangWei,

I don't think this is a public interface, but Till (in CC) might know better.

Best,
Fabian

2017-11-06 3:27 GMT+01:00 XiangWei Huang <[hidden email]>:
Hi Flink users,
Flink Jobmanager throw a NotSerializableException when i used JobMasterGateway to get ExecutionGraph of a specific job with 
message RequestJob(jobID). Blow is the detail of Exception:


[ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)
java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
	at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
	at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
	at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So,is it a bug or the way to get job’s executionGraph is invalid.

Best,XiangWei




Reply | Threaded
Open this post in threaded view
|

Re: ExecutionGraph not serializable

Till Rohrmann
Hi XiangWei,

it is actually not intended to get access to the ExecutionGraph, because it is a runtime component which does not make much sense to exist outside of the JobManager. The RequestJob message is only a hack to make the ExecutionGraph accessible to another actor running in the same ActorSystem. This is the case for the WebRuntimeMonitor handlers. With Flip-6, we will make the ExecutionGraph indirectly accessible by returning an ArchivedExecutionGraph.

Cheers,
Till

On Tue, Nov 7, 2017 at 2:47 PM, XiangWei Huang <[hidden email]> wrote:
hi Till,

   Sorry,I've made a mistake,i used StandaloneClusterClient#getJobManagerGateway  to get  ActorGateway to communicate with JobManager instead of using JobMasterGateway.
Below is the code i executed for getting ExecuteGraph of a Job.


    val flinkConfig = new Configuration()
    val flinkCli = new StandaloneClusterClient(flinkConfig)
    val jobManagerGateWay = flinkCli.getJobManagerGateway
    val jobs = jobManagerGateWay.ask(RequestRunningJobsStatus,new FiniteDuration(10,TimeUnit.SECONDS)).asInstanceOf[Future[RunningJobsStatus]]
    val jobsStatus = Await.result(jobs,new FiniteDuration(10,TimeUnit.SECONDS)).getStatusMessages().asScala.head
    val jobId = jobsStatus.getJobId
    val timeOut = new FiniteDuration(10,TimeUnit.SECONDS)
    val future = jobManagerGateWay.ask(RequestJob(jobId),timeOut)
    val result = Await.result(future,timeOut)
 
JobManager threw NotSerializableException  when i executed this code. So i wonder how is this happened and is there another way to get a job's ExecutionGraph programmatically.

Best,XiangWei

2017-11-07 17:16 GMT+08:00 Till Rohrmann <[hidden email]>:

Hi XiangWei,

how do you use the JobMasterGateway with the actor message RequestJob? The JobMasterGateway is a Java interface and does not represent an ActorCell to which you can send actor messages. Instead you should call JobMasterGateway#requestArchivedExecutionGraph.

Cheers,
Till


On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi XiangWei,

I don't think this is a public interface, but Till (in CC) might know better.

Best,
Fabian

2017-11-06 3:27 GMT+01:00 XiangWei Huang <[hidden email]>:
Hi Flink users,
Flink Jobmanager throw a NotSerializableException when i used JobMasterGateway to get ExecutionGraph of a specific job with 
message RequestJob(jobID). Blow is the detail of Exception:


[ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)
java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
	at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
	at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
	at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So,is it a bug or the way to get job’s executionGraph is invalid.

Best,XiangWei