Execute multiple jobs in parallel (threading): java.io.OptionalDataException

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Execute multiple jobs in parallel (threading): java.io.OptionalDataException

David Dreyfus
Hello,

I am trying to submit multiple jobs to flink from within my Java program.
I am running into an exception that may be related:
java.io.OptionalDataException.

Should I be able to create multiple plans/jobs within a single session and
execute them concurrently?
If so, is there a working example you can point me at?

Do I share the same ExecutionEnvironment?
It looks like calls to getExecutionEnvironment() return the same one.

I have a number of different transformations on my data I'd like to make.
I'd rather not create one very large job and have them processed in
parallel.
My cluster has enough resources that performing each job sequentially would
be very wasteful.

Thank you,
David

Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
java.io.OptionalDataException
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
        at java.util.HashMap.readObject(HashMap.java:1407)
        at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
        at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Execute multiple jobs in parallel (threading): java.io.OptionalDataException

Till Rohrmann

Hi David,

I cannot exactly tell how you ended up seeing an OptionalDataException without seeing your code.

Flink supports to run multiple jobs on the same cluster. That’s what we call the session mode.

You should not reuse the ExecutionEnvironment because then, you will create a single job which simply consists of multiple disjunct parts. Calling ExecutionEnvironment.getExecutionEnvironment will give you a fresh ExecutionEnvrionment which you can use to submit a new job. Note that you have to call env.execute in a separate thread because it is a blocking operation.

Cheers,
Till


On Thu, Oct 26, 2017 at 10:22 PM, David Dreyfus <[hidden email]> wrote:
Hello,

I am trying to submit multiple jobs to flink from within my Java program.
I am running into an exception that may be related:
java.io.OptionalDataException.

Should I be able to create multiple plans/jobs within a single session and
execute them concurrently?
If so, is there a working example you can point me at?

Do I share the same ExecutionEnvironment?
It looks like calls to getExecutionEnvironment() return the same one.

I have a number of different transformations on my data I'd like to make.
I'd rather not create one very large job and have them processed in
parallel.
My cluster has enough resources that performing each job sequentially would
be very wasteful.

Thank you,
David

Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
java.io.OptionalDataException
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
        at java.util.HashMap.readObject(HashMap.java:1407)
        at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
        at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/