Login  Register

Get Flink ExecutionGraph Programmatically

classic Classic list List threaded Threaded
1 message Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Get Flink ExecutionGraph Programmatically

Chawla,Sumit
4 posts

Hi All


I am trying to get JOB  accumulators.  ( I am aware that I can get the accumulators through REST APIs as well, but i wanted to avoid JSON parsing).  

Looking at JobAccumulatorsHandler i am trying to get execution graph for currently running job.  Following is my code:

  InetSocketAddress initialJobManagerAddress=new InetSocketAddress(hostName,port);
InetAddress ownHostname;
ownHostname= ConnectionUtils.
findConnectingAddress(initialJobManagerAddress,2000,400);

ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration,
new Some(new Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));

FiniteDuration timeout= FiniteDuration.
apply(10, TimeUnit.SECONDS);

ActorGateway akkaActorGateway= LeaderRetrievalUtils.
retrieveLeaderGateway(
LeaderRetrievalUtils.
createLeaderRetrievalService(configuration),
actorSystem,timeout
);


Future<Object> future=akkaActorGateway.ask(new RequestJobDetails(true,false),timeout);

MultipleJobsDetails result=(MultipleJobsDetails) Await.
result(future,timeout);
ExecutionGraphHolder executionGraphHolder=
new ExecutionGraphHolder(timeout);
LOG.info(result.toString());
for(JobDetails detail:result.getRunningJobs()){
LOG.info(detail.getJobName() + " ID " + detail.getJobId());
ExecutionGraph executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);
LOG.info("Accumulators " + executionGraph.aggregateUserAccumulators());
}
 
However, i am receiving following error in Flink:

2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody ERROR akka.remote.EndpointWriter - Transient association error (association remains live)
java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.CheckpointCoordinator
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_92]
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[scala-library-2.10.5.jar:?]
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) ~[akka-remote_2.10-2.3.7.jar:?]
        at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) ~[akka-remote_2.10-2.3.7.jar:?]
        at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) ~[akka-remote_2.10-2.3.7.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[scala-library-2.10.5.jar:?]
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844) ~[akka-remote_2.10-2.3.7.jar:?]

Any reason why its failing? This code works when invoked through WebRuntimeMonitor. 

Regards
Sumit Chawla