Random Selection

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Random Selection

Maximilian Alber
Hi Flinksters,

I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions:

      val sample_x = X filter(new RichFilterFunction[Vector](){
        var i: Int = -1

        override def open(config: Configuration) = {
          i = scala.util.Random.nextInt(N)
        }
        def filter(a: Vector) = a.id == i
      })
      val sample_y = Y filter(new RichFilterFunction[Vector](){
        def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
      })

That's the error I get:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter "Filter at Test$$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at Test$delayedInit$body.apply(test.scala:304)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:45)
    at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter "Filter at Test$$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
    at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
    ... 21 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
    ... 26 more
Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)


Did I miss something or it is simply not possible?
Thanks!
Cheers,
Max
Reply | Threaded
Open this post in threaded view
|

Re: Random Selection

Till Rohrmann

Hi Max,

the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximilian@... wrote:

Hi Flinksters,

I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions:

      val sample_x = X filter(new RichFilterFunction[Vector](){
        var i: Int = -1

        override def open(config: Configuration) = {
          i = scala.util.Random.nextInt(N)
        }
        def filter(a: Vector) = a.id == i
      })
      val sample_y = Y filter(new RichFilterFunction[Vector](){
        def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
      })

That's the error I get:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at Test$delayedInit$body.apply(test.scala:304)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:45)
    at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
    at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
    ... 21 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
    ... 26 more
Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)


Did I miss something or it is simply not possible?
Thanks!
Cheers,
Max

Reply | Threaded
Open this post in threaded view
|

RE: Random Selection

Kruse, Sebastian

Hi everyone,

 

I did not reenact it, but I think the problem here is rather the anonymous class. It looks like it is created within a class, not an object. Thus it is not “static” in Java terms, which means that also its surrounding class (the job class) will be serialized. And in this job class, there seems to be a DataSet field, that cannot be serialized.

 

If that really is the problem, you should either define your anonymous class within the companion object of your job class or resort directly to a function (and make sure that you do not pass a variable from your job class into the scope of the function).

 

Cheers,

Sebastian

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: Montag, 15. Juni 2015 14:16
To: [hidden email]
Subject: Re: Random Selection

 

Hi Max,

the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximilian@... wrote:

Hi Flinksters,

I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions:

      val sample_x = X filter(new RichFilterFunction[Vector](){
        var i: Int = -1

        override def open(config: Configuration) = {
          i = scala.util.Random.nextInt(N)
        }
        def filter(a: Vector) = a.id == i
      })
      val sample_y = Y filter(new RichFilterFunction[Vector](){
        def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
      })

That's the error I get:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at Test$delayedInit$body.apply(test.scala:304)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:45)
    at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
    at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
    ... 21 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
    ... 26 more
Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)

Did I miss something or it is simply not possible?

Thanks!

Cheers,

Max

Reply | Threaded
Open this post in threaded view
|

Re: Random Selection

Maximilian Alber
Hi everyone!
Thanks! It seems the variable that makes the problems. Making an inner class solved the issue.
Cheers,
Max

On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian <[hidden email]> wrote:

Hi everyone,

 

I did not reenact it, but I think the problem here is rather the anonymous class. It looks like it is created within a class, not an object. Thus it is not “static” in Java terms, which means that also its surrounding class (the job class) will be serialized. And in this job class, there seems to be a DataSet field, that cannot be serialized.

 

If that really is the problem, you should either define your anonymous class within the companion object of your job class or resort directly to a function (and make sure that you do not pass a variable from your job class into the scope of the function).

 

Cheers,

Sebastian

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: Montag, 15. Juni 2015 14:16
To: [hidden email]
Subject: Re: Random Selection

 

Hi Max,

the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximilian@... wrote:

Hi Flinksters,

I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions:

      val sample_x = X filter(new RichFilterFunction[Vector](){
        var i: Int = -1

        override def open(config: Configuration) = {
          i = scala.util.Random.nextInt(N)
        }
        def filter(a: Vector) = a.id == i
      })
      val sample_y = Y filter(new RichFilterFunction[Vector](){
        def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
      })

That's the error I get:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at Test$delayedInit$body.apply(test.scala:304)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:45)
    at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
    at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
    ... 21 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
    ... 26 more
Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)

Did I miss something or it is simply not possible?

Thanks!

Cheers,

Max


Reply | Threaded
Open this post in threaded view
|

Re: Random Selection

Stephan Ewen
Actually, the closure cleaner is supposed to take care of the "anonymous inner class" situation.

Did you deactivate that one, by any chance?

On Mon, Jun 15, 2015 at 5:31 PM, Maximilian Alber <[hidden email]> wrote:
Hi everyone!
Thanks! It seems the variable that makes the problems. Making an inner class solved the issue.
Cheers,
Max

On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian <[hidden email]> wrote:

Hi everyone,

 

I did not reenact it, but I think the problem here is rather the anonymous class. It looks like it is created within a class, not an object. Thus it is not “static” in Java terms, which means that also its surrounding class (the job class) will be serialized. And in this job class, there seems to be a DataSet field, that cannot be serialized.

 

If that really is the problem, you should either define your anonymous class within the companion object of your job class or resort directly to a function (and make sure that you do not pass a variable from your job class into the scope of the function).

 

Cheers,

Sebastian

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: Montag, 15. Juni 2015 14:16
To: [hidden email]
Subject: Re: Random Selection

 

Hi Max,

the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximilian@... wrote:

Hi Flinksters,

I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions:

      val sample_x = X filter(new RichFilterFunction[Vector](){
        var i: Int = -1

        override def open(config: Configuration) = {
          i = scala.util.Random.nextInt(N)
        }
        def filter(a: Vector) = a.id == i
      })
      val sample_y = Y filter(new RichFilterFunction[Vector](){
        def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
      })

That's the error I get:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at Test$delayedInit$body.apply(test.scala:304)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:45)
    at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
    at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
    ... 21 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
    ... 26 more
Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)

Did I miss something or it is simply not possible?

Thanks!

Cheers,

Max



Reply | Threaded
Open this post in threaded view
|

Re: Random Selection

Maximilian Alber
No clue. I used the current branch aka 0.9-SNAPSHOT.
Or is this something related to Scala?

On Mon, Jun 22, 2015 at 4:45 PM, Stephan Ewen <[hidden email]> wrote:
Actually, the closure cleaner is supposed to take care of the "anonymous inner class" situation.

Did you deactivate that one, by any chance?

On Mon, Jun 15, 2015 at 5:31 PM, Maximilian Alber <[hidden email]> wrote:
Hi everyone!
Thanks! It seems the variable that makes the problems. Making an inner class solved the issue.
Cheers,
Max

On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian <[hidden email]> wrote:

Hi everyone,

 

I did not reenact it, but I think the problem here is rather the anonymous class. It looks like it is created within a class, not an object. Thus it is not “static” in Java terms, which means that also its surrounding class (the job class) will be serialized. And in this job class, there seems to be a DataSet field, that cannot be serialized.

 

If that really is the problem, you should either define your anonymous class within the companion object of your job class or resort directly to a function (and make sure that you do not pass a variable from your job class into the scope of the function).

 

Cheers,

Sebastian

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: Montag, 15. Juni 2015 14:16
To: [hidden email]
Subject: Re: Random Selection

 

Hi Max,

the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximilian@... wrote:

Hi Flinksters,

I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions:

      val sample_x = X filter(new RichFilterFunction[Vector](){
        var i: Int = -1

        override def open(config: Configuration) = {
          i = scala.util.Random.nextInt(N)
        }
        def filter(a: Vector) = a.id == i
      })
      val sample_y = Y filter(new RichFilterFunction[Vector](){
        def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
      })

That's the error I get:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at Test$delayedInit$body.apply(test.scala:304)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:45)
    at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
    at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
    ... 21 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
    ... 26 more
Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)

Did I miss something or it is simply not possible?

Thanks!

Cheers,

Max