Hello - The following piece of code is an example of a connected data streams .. val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) But if I try to execute the above from within a scala.concurrent.Future, I get the following exception .. org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) ... Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) Any thoughts why this may happen ? |
Does TaxiRide or TaxiRideFare implements Serializable? On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
|
Yes, they are generated from Avro Schema and implements Serializable .. On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. regards. On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Sent from my iPhone
|
Hi Debasish, I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression). When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen. You could have a try to move the piece of codes to a named non-inner class. Thanks, Biao /'bɪ.aʊ/ On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
|
I think the issue may not be linked with Future. What happens is when this piece of code is executed .. val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47) at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278) at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149) at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) at scala.util.Try$.apply(Try.scala:213) at pipelines.runner.Runner$.run(Runner.scala:43) at pipelines.runner.Runner$.main(Runner.scala:30) at pipelines.runner.Runner.main(Runner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.ArrayList.writeObject(ArrayList.java:766) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133) I also tried the following .. class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] { @transient var rideState: ValueState[TaxiRide] = null @transient var fareState: ValueState[TaxiFare] = null override def open(params: Configuration): Unit = { super.open(params) rideState = getRuntimeContext.getState( new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide])) fareState = getRuntimeContext.getState( new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare])) } and moved the state initialization to open function. But still get the same result. Help ? regards. On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this .. java.util.concurrent.ExecutionException: Boxed Error at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at scala.concurrent.Promise.complete(Promise.scala:53) at scala.concurrent.Promise.complete$(Promise.scala:52) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ... 7 more I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see. Can anyone please help with an explanation ? regards. On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
Hi Debasish, Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1] So it would look something like this:
On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <[hidden email]> wrote:
|
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following .. def execute(..) = { // this does all data stream manipulation, joins etc. buildComputationGraph() // submits for execution with StreamExecutionEnvironment env.execute(..) } and we want to do .. val jobExecutionResultFuture = Future(execute(..)) and this gives that exception. regards. On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
Hi Debasish, I think there is something critical of your usage hided. It might help if you could provide more details. It still confuses me how you solve the serialization issue. Why the non-transient fields only affects serialization in a future? WRT this ProgramAbortException issue, do you submit jobs concurrently in one process? Currently job submission is not thread-safe. It relies on some static variables which could be affected by other concurrent submissions in the same process. Asking this because usually job submission is not through OptimizerPlanEnvironment which appears in your exception stack trace. Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <[hidden email]> wrote:
|
We solved the problem of serialization by making some things transient which were being captured as part of the closure. So we no longer have serialization errors. Everything works properly without the future. I realize that because of statics concurrent job submission will be an issue. But we are submitting one job only - the difference is that it's through a Future. So there is no concurrent submission unless I am missing something. regards. On Thu, Sep 19, 2019 at 12:54 PM Biao Liu <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
Debshish, could you share an example of before and after of your classes for future reference? On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, <[hidden email]> wrote:
|
Hi Yuval - Here's a brief summary f what we are trying to do .. At the library level we have this .. def buildExecutionGraph(): Unit def executeStreamingQueries(env: StreamExecutionEnvironment): JobExecutionResult = { buildExecutionGraph() env.execute(s"Executing $streamletRef") } and we do the following .. // note this ctx is created outside the Future val jobResult = Future(createLogic.executeStreamingQueries(ctx.env)) and at the application level we have something like this .. override def buildExecutionGraph = { val rides: DataStream[TaxiRide] = readStream(inTaxiRide) // reads from Kafka .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) writeStream(out, processed) // writes to Kafka } It fails only when we use the Future, otherwise it works .. regards. On Thu, Sep 19, 2019 at 1:16 PM Yuval Itzchakov <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
Free forum by Nabble | Edit this page |