Hello,
I am trying to submit multiple jobs to flink from within my Java program. I am running into an exception that may be related: java.io.OptionalDataException. Should I be able to create multiple plans/jobs within a single session and execute them concurrently? If so, is there a working example you can point me at? Do I share the same ExecutionEnvironment? It looks like calls to getExecutionEnvironment() return the same one. I have a number of different transformations on my data I'd like to make. I'd rather not create one very large job and have them processed in parallel. My cluster has enough resources that performing each job sequentially would be very wasteful. Thank you, David Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15) java.io.OptionalDataException at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at java.util.HashMap.readObject(HashMap.java:1407) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi David, I cannot exactly tell how you ended up seeing an OptionalDataException without seeing your code. Flink supports to run multiple jobs on the same cluster. That’s what we call the session mode. You should not reuse the Cheers, On Thu, Oct 26, 2017 at 10:22 PM, David Dreyfus <[hidden email]> wrote: Hello, |
Free forum by Nabble | Edit this page |