Hi Flinksters, I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions:val sample_x = X filter(new RichFilterFunction[Vector](){ var i: Int = -1 override def open(config: Configuration) = { i = scala.util.Random.nextInt(N) } def filter(a: Vector) = a.id == i }) val sample_y = Y filter(new RichFilterFunction[Vector](){ def filter(a: Vector) = a.id == scala.util.Random.nextInt(N) }) Exception in thread "main" org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter "Filter at Test$$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) at Test$delayedInit$body.apply(test.scala:304) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(test.scala:45) at Test.main(test.scala) Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter "Filter at Test$$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427) ... 21 more Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305) ... 26 more Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314) at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268) at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) Cheers, Max |
Hi Max, the problem is that you’re trying to serialize the companion object of Cheers, On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximilian@... wrote:
|
Hi everyone, I did not reenact it, but I think the problem here is rather the anonymous class. It looks like it is created within a class, not an object. Thus it is not
“static” in Java terms, which means that also its surrounding class (the job class) will be serialized. And in this job class, there seems to be a DataSet field, that cannot be serialized. If that really is the problem, you should either define your anonymous class within the companion object of your job class or resort directly to a function
(and make sure that you do not pass a variable from your job class into the scope of the function). Cheers, Sebastian From: Till Rohrmann [mailto:[hidden email]]
Hi Max, the problem is that you’re trying to serialize the companion object of
Cheers, On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber
alber.maximilian@... wrote:
|
Hi everyone! Thanks! It seems the variable that makes the problems. Making an inner class solved the issue.On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian <[hidden email]> wrote:
|
Actually, the closure cleaner is supposed to take care of the "anonymous inner class" situation. Did you deactivate that one, by any chance? On Mon, Jun 15, 2015 at 5:31 PM, Maximilian Alber <[hidden email]> wrote:
|
No clue. I used the current branch aka 0.9-SNAPSHOT. Or is this something related to Scala?On Mon, Jun 22, 2015 at 4:45 PM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |