serialization issue in streaming job run with scala Future

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

serialization issue in streaming job run with scala Future

Debasish Ghosh
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?

Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Deepak Sharma
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Debasish Ghosh
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Debasish Ghosh
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Biao Liu
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Debasish Ghosh
I think the issue may not be linked with Future. What happens is when this piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? 

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
            at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
            at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
            at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
            at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
            at scala.util.Try$.apply(Try.scala:213)
            at pipelines.runner.Runner$.run(Runner.scala:43)
            at pipelines.runner.Runner$.main(Runner.scala:30)
            at pipelines.runner.Runner.main(Runner.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
            at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
            at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
            at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at java.util.ArrayList.writeObject(ArrayList.java:766)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)


I also tried the following ..

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

    @transient var rideState: ValueState[TaxiRide] = null
    @transient var fareState: ValueState[TaxiFare] = null

    override def open(params: Configuration): Unit = {
      super.open(params)
      rideState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
      fareState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
    }


and moved the state initialization to open function. But still get the same result.

Help ?

regards.



On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone


--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Debasish Ghosh
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this ..

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <[hidden email]> wrote:
I think the issue may not be linked with Future. What happens is when this piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? 

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
            at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
            at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
            at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
            at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
            at scala.util.Try$.apply(Try.scala:213)
            at pipelines.runner.Runner$.run(Runner.scala:43)
            at pipelines.runner.Runner$.main(Runner.scala:30)
            at pipelines.runner.Runner.main(Runner.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
            at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
            at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
            at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at java.util.ArrayList.writeObject(ArrayList.java:766)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)


I also tried the following ..

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

    @transient var rideState: ValueState[TaxiRide] = null
    @transient var fareState: ValueState[TaxiFare] = null

    override def open(params: Configuration): Unit = {
      super.open(params)
      rideState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
      fareState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
    }


and moved the state initialization to open function. But still get the same result.

Help ?

regards.



On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone


--


--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Rafi Aroch
Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}
// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <[hidden email]> wrote:
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this ..

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <[hidden email]> wrote:
I think the issue may not be linked with Future. What happens is when this piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? 

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
            at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
            at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
            at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
            at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
            at scala.util.Try$.apply(Try.scala:213)
            at pipelines.runner.Runner$.run(Runner.scala:43)
            at pipelines.runner.Runner$.main(Runner.scala:30)
            at pipelines.runner.Runner.main(Runner.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
            at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
            at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
            at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at java.util.ArrayList.writeObject(ArrayList.java:766)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)


I also tried the following ..

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

    @transient var rideState: ValueState[TaxiRide] = null
    @transient var fareState: ValueState[TaxiFare] = null

    override def open(params: Configuration): Unit = {
      super.open(params)
      rideState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
      fareState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
    }


and moved the state initialization to open function. But still get the same result.

Help ?

regards.



On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone


--


--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Debasish Ghosh
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following ..

def execute(..) = {
  // this does all data stream manipulation, joins etc.
  buildComputationGraph()

  // submits for execution with StreamExecutionEnvironment
  env.execute(..)
}

and we want to do ..

val jobExecutionResultFuture = Future(execute(..))

and this gives that exception.

regards.

On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <[hidden email]> wrote:
Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}
// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <[hidden email]> wrote:
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this ..

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <[hidden email]> wrote:
I think the issue may not be linked with Future. What happens is when this piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? 

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
            at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
            at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
            at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
            at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
            at scala.util.Try$.apply(Try.scala:213)
            at pipelines.runner.Runner$.run(Runner.scala:43)
            at pipelines.runner.Runner$.main(Runner.scala:30)
            at pipelines.runner.Runner.main(Runner.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
            at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
            at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
            at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at java.util.ArrayList.writeObject(ArrayList.java:766)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)


I also tried the following ..

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

    @transient var rideState: ValueState[TaxiRide] = null
    @transient var fareState: ValueState[TaxiFare] = null

    override def open(params: Configuration): Unit = {
      super.open(params)
      rideState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
      fareState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
    }


and moved the state initialization to open function. But still get the same result.

Help ?

regards.



On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone


--


--


--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Biao Liu
Hi Debasish,

I think there is something critical of your usage hided. It might help if you could provide more details.

It still confuses me how you solve the serialization issue. Why the non-transient fields only affects serialization in a future?

WRT this ProgramAbortException issue, do you submit jobs concurrently in one process?
Currently job submission is not thread-safe. It relies on some static variables which could be affected by other concurrent submissions in the same process.
Asking this because usually job submission is not through OptimizerPlanEnvironment which appears in your exception stack trace.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <[hidden email]> wrote:
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following ..

def execute(..) = {
  // this does all data stream manipulation, joins etc.
  buildComputationGraph()

  // submits for execution with StreamExecutionEnvironment
  env.execute(..)
}

and we want to do ..

val jobExecutionResultFuture = Future(execute(..))

and this gives that exception.

regards.

On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <[hidden email]> wrote:
Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}
// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <[hidden email]> wrote:
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this ..

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <[hidden email]> wrote:
I think the issue may not be linked with Future. What happens is when this piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? 

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
            at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
            at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
            at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
            at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
            at scala.util.Try$.apply(Try.scala:213)
            at pipelines.runner.Runner$.run(Runner.scala:43)
            at pipelines.runner.Runner$.main(Runner.scala:30)
            at pipelines.runner.Runner.main(Runner.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
            at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
            at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
            at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at java.util.ArrayList.writeObject(ArrayList.java:766)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)


I also tried the following ..

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

    @transient var rideState: ValueState[TaxiRide] = null
    @transient var fareState: ValueState[TaxiFare] = null

    override def open(params: Configuration): Unit = {
      super.open(params)
      rideState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
      fareState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
    }


and moved the state initialization to open function. But still get the same result.

Help ?

regards.



On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone


--


--


--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Debasish Ghosh
We solved the problem of serialization by making some things transient which were being captured as part of the closure. So we no longer have serialization errors. Everything works properly without the future.

I realize that because of statics concurrent job submission will be an issue. But we are submitting one job only - the difference is that it's through a Future. So there is no concurrent submission unless I am missing something.

regards.

On Thu, Sep 19, 2019 at 12:54 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I think there is something critical of your usage hided. It might help if you could provide more details.

It still confuses me how you solve the serialization issue. Why the non-transient fields only affects serialization in a future?

WRT this ProgramAbortException issue, do you submit jobs concurrently in one process?
Currently job submission is not thread-safe. It relies on some static variables which could be affected by other concurrent submissions in the same process.
Asking this because usually job submission is not through OptimizerPlanEnvironment which appears in your exception stack trace.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <[hidden email]> wrote:
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following ..

def execute(..) = {
  // this does all data stream manipulation, joins etc.
  buildComputationGraph()

  // submits for execution with StreamExecutionEnvironment
  env.execute(..)
}

and we want to do ..

val jobExecutionResultFuture = Future(execute(..))

and this gives that exception.

regards.

On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <[hidden email]> wrote:
Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}
// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <[hidden email]> wrote:
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this ..

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <[hidden email]> wrote:
I think the issue may not be linked with Future. What happens is when this piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? 

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
            at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
            at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
            at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
            at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
            at scala.util.Try$.apply(Try.scala:213)
            at pipelines.runner.Runner$.run(Runner.scala:43)
            at pipelines.runner.Runner$.main(Runner.scala:30)
            at pipelines.runner.Runner.main(Runner.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
            at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
            at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
            at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at java.util.ArrayList.writeObject(ArrayList.java:766)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)


I also tried the following ..

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

    @transient var rideState: ValueState[TaxiRide] = null
    @transient var fareState: ValueState[TaxiFare] = null

    override def open(params: Configuration): Unit = {
      super.open(params)
      rideState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
      fareState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
    }


and moved the state initialization to open function. But still get the same result.

Help ?

regards.



On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone


--


--


--


--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Yuval Itzchakov
Debshish, could you share an example of before and after of your classes for future reference?

On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, <[hidden email]> wrote:
We solved the problem of serialization by making some things transient which were being captured as part of the closure. So we no longer have serialization errors. Everything works properly without the future.

I realize that because of statics concurrent job submission will be an issue. But we are submitting one job only - the difference is that it's through a Future. So there is no concurrent submission unless I am missing something.

regards.

On Thu, Sep 19, 2019 at 12:54 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I think there is something critical of your usage hided. It might help if you could provide more details.

It still confuses me how you solve the serialization issue. Why the non-transient fields only affects serialization in a future?

WRT this ProgramAbortException issue, do you submit jobs concurrently in one process?
Currently job submission is not thread-safe. It relies on some static variables which could be affected by other concurrent submissions in the same process.
Asking this because usually job submission is not through OptimizerPlanEnvironment which appears in your exception stack trace.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <[hidden email]> wrote:
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following ..

def execute(..) = {
  // this does all data stream manipulation, joins etc.
  buildComputationGraph()

  // submits for execution with StreamExecutionEnvironment
  env.execute(..)
}

and we want to do ..

val jobExecutionResultFuture = Future(execute(..))

and this gives that exception.

regards.

On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <[hidden email]> wrote:
Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}
// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <[hidden email]> wrote:
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this ..

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <[hidden email]> wrote:
I think the issue may not be linked with Future. What happens is when this piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? 

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
            at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
            at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
            at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
            at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
            at scala.util.Try$.apply(Try.scala:213)
            at pipelines.runner.Runner$.run(Runner.scala:43)
            at pipelines.runner.Runner$.main(Runner.scala:30)
            at pipelines.runner.Runner.main(Runner.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
            at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
            at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
            at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at java.util.ArrayList.writeObject(ArrayList.java:766)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)


I also tried the following ..

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

    @transient var rideState: ValueState[TaxiRide] = null
    @transient var fareState: ValueState[TaxiFare] = null

    override def open(params: Configuration): Unit = {
      super.open(params)
      rideState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
      fareState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
    }


and moved the state initialization to open function. But still get the same result.

Help ?

regards.



On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone


--


--


--


--
Reply | Threaded
Open this post in threaded view
|

Re: serialization issue in streaming job run with scala Future

Debasish Ghosh
Hi Yuval -

Here's a brief summary f what we are trying to do ..

At the library level we have this ..

def buildExecutionGraph(): Unit

def executeStreamingQueries(env: StreamExecutionEnvironment): JobExecutionResult = {
  buildExecutionGraph()
  env.execute(s"Executing $streamletRef")
}

and we do the following ..

// note this ctx is created outside the Future
val jobResult = Future(createLogic.executeStreamingQueries(ctx.env))

and at the application level we have something like this ..

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide) // reads from Kafka
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed) // writes to Kafka
    }

It fails only when we use the Future, otherwise it works ..

regards.

On Thu, Sep 19, 2019 at 1:16 PM Yuval Itzchakov <[hidden email]> wrote:
Debshish, could you share an example of before and after of your classes for future reference?

On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, <[hidden email]> wrote:
We solved the problem of serialization by making some things transient which were being captured as part of the closure. So we no longer have serialization errors. Everything works properly without the future.

I realize that because of statics concurrent job submission will be an issue. But we are submitting one job only - the difference is that it's through a Future. So there is no concurrent submission unless I am missing something.

regards.

On Thu, Sep 19, 2019 at 12:54 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I think there is something critical of your usage hided. It might help if you could provide more details.

It still confuses me how you solve the serialization issue. Why the non-transient fields only affects serialization in a future?

WRT this ProgramAbortException issue, do you submit jobs concurrently in one process?
Currently job submission is not thread-safe. It relies on some static variables which could be affected by other concurrent submissions in the same process.
Asking this because usually job submission is not through OptimizerPlanEnvironment which appears in your exception stack trace.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <[hidden email]> wrote:
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following ..

def execute(..) = {
  // this does all data stream manipulation, joins etc.
  buildComputationGraph()

  // submits for execution with StreamExecutionEnvironment
  env.execute(..)
}

and we want to do ..

val jobExecutionResultFuture = Future(execute(..))

and this gives that exception.

regards.

On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <[hidden email]> wrote:
Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}
// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <[hidden email]> wrote:
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this ..

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <[hidden email]> wrote:
I think the issue may not be linked with Future. What happens is when this piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? 

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
            at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
            at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
            at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
            at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
            at scala.util.Try$.apply(Try.scala:213)
            at pipelines.runner.Runner$.run(Runner.scala:43)
            at pipelines.runner.Runner$.main(Runner.scala:30)
            at pipelines.runner.Runner.main(Runner.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
            at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
            at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
            at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at java.util.ArrayList.writeObject(ArrayList.java:766)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)


I also tried the following ..

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

    @transient var rideState: ValueState[TaxiRide] = null
    @transient var fareState: ValueState[TaxiFare] = null

    override def open(params: Configuration): Unit = {
      super.open(params)
      rideState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
      fareState = getRuntimeContext.getState(
        new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
    }


and moved the state initialization to open function. But still get the same result.

Help ?

regards.



On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <[hidden email]> wrote:
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <[hidden email]> wrote:
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. 

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <[hidden email]> wrote:
Yes, they are generated from Avro Schema and implements Serializable .. 

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <[hidden email]> wrote:
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)


When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. 
But if I try to execute the above from within a scala.concurrent.Future, I get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields.
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?



--


--
--
Sent from my iPhone


--


--


--


--


--