Hi, I am getting NoSerializableException in this class- public class RecordsFilterer<T extends GenericRecord> { Exception in thread "main" org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: RecordsFilterer at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) ... 17 more Caused by: java.io.NotSerializableException: RecordsFilterer at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) ... 19 more |
Hi Tarandeep, the exception suggests that Flink tries to serialize RecordsFilterer as a user function (this happens via Java Serialization).Instead of using CoGroup, I would use distinct and an OuterJoin like this: DataSet<String> distIds = filtereredIds.distinct(); DataSet<Tuple2<Boolean, T> result = records .leftOuterJoin(distIds) .where(KEYSELECTOR) .equalTo("*") // use full string as key .with(JOINFUNC) // set Bool to false if right == null, true otherwise Best, Fabian 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
|
Hi, the problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer RecordFilterer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). I'm afraid this is a bug. Best, Aljoscha On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote:
|
Thank you Aljoscha and Fabian for your replies. This is how I have got it working (with a hack): In my dataset, the join field/key can be null otherwise .where(fieldName) works and I don't get not-serializable exception. So I applied a MapFunction to DataSet and put a dummy value in the join field/key where it was null. Then In the join function, I change it back to null. @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm afraid this is a bug", I am assuming you are referring to Flink engine itself. @Fabian: thanks for the optimization tip. On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]> wrote:
|
You can also make the KeySelector a static inner class. That should work as well. On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <[hidden email]> wrote:
|
Hi, yes, I was talking about a Flink bug. I forgot to mention the work-around that Stephan mentioned. On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <[hidden email]> wrote:
|
Is there an issue or a fix for proper use of the ClojureCleaner in
CoGroup.where()? On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > yes, I was talking about a Flink bug. I forgot to mention the work-around > that Stephan mentioned. > > On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <[hidden email]> wrote: >> >> You can also make the KeySelector a static inner class. That should work >> as well. >> >> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <[hidden email]> >> wrote: >>> >>> Thank you Aljoscha and Fabian for your replies. >>> >>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm >>> afraid this is a bug", I am assuming you are referring to Flink engine >>> itself. >>> >>> @Fabian: thanks for the optimization tip. >>> >>> This is how I have got it working (with a hack): In my dataset, the join >>> field/key can be null otherwise .where(fieldName) works and I don't get >>> not-serializable exception. So I applied a MapFunction to DataSet and put a >>> dummy value in the join field/key where it was null. Then In the join >>> function, I change it back to null. >>> >>> Best, >>> Tarandeep >>> >>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]> >>> wrote: >>>> >>>> Hi, >>>> the problem is that the KeySelector is an anonymous inner class and as >>>> such as a reference to the outer RecordFilterer object. Normally, this would >>>> be rectified by the closure cleaner but the cleaner is not used in >>>> CoGroup.where(). I'm afraid this is a bug. >>>> >>>> Best, >>>> Aljoscha >>>> >>>> >>>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote: >>>>> >>>>> Hi Tarandeep, >>>>> >>>>> the exception suggests that Flink tries to serialize RecordsFilterer as >>>>> a user function (this happens via Java Serialization). >>>>> I said suggests because the code that uses RecordsFilterer is not >>>>> included. >>>>> >>>>> To me it looks like RecordsFilterer should not be used as a user >>>>> function. It is a helper class to construct a DataSet program, so it should >>>>> not be shipped for execution. >>>>> You would use such a class as follows: >>>>> >>>>> DataSet<T> records = ... >>>>> DataSet<String> filterIDs = ... >>>>> >>>>> RecordsFilterer rf = new RecordsFilterer(); >>>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, >>>>> filterIDs, "myField"); >>>>> >>>>> Regarding the join code, I would suggest an optimization. >>>>> Instead of using CoGroup, I would use distinct and an OuterJoin like >>>>> this: >>>>> >>>>> DataSet<String> distIds = filtereredIds.distinct(); >>>>> DataSet<Tuple2<Boolean, T> result = records >>>>> .leftOuterJoin(distIds) >>>>> .where(KEYSELECTOR) >>>>> .equalTo("*") // use full string as key >>>>> .with(JOINFUNC) // set Bool to false if right == null, true otherwise >>>>> >>>>> Best, Fabian >>>>> >>>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I am getting NoSerializableException in this class- >>>>>> >>>>>> >>>>>> >>>>>> public class RecordsFilterer<T extends GenericRecord> { >>>>>> >>>>>> public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> >>>>>> dataset, DataSet<String> filteredIds, String fieldName) { >>>>>> return dataset.coGroup(filteredIds) >>>>>> .where(new KeySelector<T, String>() { >>>>>> @Override >>>>>> public String getKey(T t) throws Exception { >>>>>> String s = (String) t.get(fieldName); >>>>>> return s != null ? s : >>>>>> UUID.randomUUID().toString(); >>>>>> } >>>>>> }) >>>>>> .equalTo((KeySelector<String, String>) s -> s) >>>>>> .with(new CoGroupFunction<T, String, >>>>>> Tuple2<Boolean,T>>() { >>>>>> @Override >>>>>> public void coGroup(Iterable<T> records, >>>>>> Iterable<String> ids, >>>>>> Collector<Tuple2<Boolean,T>> >>>>>> collector) throws Exception { >>>>>> boolean filterFlag = false; >>>>>> for (String id : ids) { >>>>>> filterFlag = true; >>>>>> } >>>>>> >>>>>> for (T record : records) { >>>>>> collector.collect(new Tuple2<>(filterFlag, >>>>>> record)); >>>>>> } >>>>>> } >>>>>> }); >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> What I am trying to do is write a generic code that will join Avro >>>>>> records (of different types) with String records and there is a match add a >>>>>> filter flag. This way I can use the same code for different Avro record >>>>>> types. But I am getting this exception- >>>>>> >>>>>> Exception in thread "main" >>>>>> org.apache.flink.optimizer.CompilerException: Error translating node 'Map >>>>>> "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] >>>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could >>>>>> not write the user code wrapper class >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>>>>> java.io.NotSerializableException: >>>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer >>>>>> at >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) >>>>>> at >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) >>>>>> at >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) >>>>>> at >>>>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) >>>>>> at >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>>> at >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>>> at >>>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) >>>>>> at >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) >>>>>> at >>>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) >>>>>> at >>>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) >>>>>> at >>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>>>>> at >>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) >>>>>> at >>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>>>> at >>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >>>>>> Caused by: >>>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could >>>>>> not write the user code wrapper class >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>>>>> java.io.NotSerializableException: RecordsFilterer >>>>>> at >>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) >>>>>> at >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) >>>>>> at >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) >>>>>> ... 17 more >>>>>> Caused by: java.io.NotSerializableException: RecordsFilterer >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>> at >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>> at >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>> at >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>> at >>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) >>>>>> at >>>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) >>>>>> at >>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >>>>>> ... 19 more >>>>>> >>>>>> >>>>>> Please help me understand why I get this exception and how to fix it >>>>>> [rewrite code may be?] >>>>>> >>>>>> Thanks, >>>>>> Tarandeep >>>>>> >>>>>> >>>>> >>> >> > |
Nope, I think there is neither a fix nor an open issue for this right now. On Mon, 13 Jun 2016 at 11:31 Maximilian Michels <[hidden email]> wrote: Is there an issue or a fix for proper use of the ClojureCleaner in |
Here's the issue https://issues.apache.org/jira/browse/FLINK-4078
On Mon, Jun 13, 2016 at 12:27 PM, Aljoscha Krettek <[hidden email]> wrote: > Nope, I think there is neither a fix nor an open issue for this right now. > > On Mon, 13 Jun 2016 at 11:31 Maximilian Michels <[hidden email]> wrote: >> >> Is there an issue or a fix for proper use of the ClojureCleaner in >> CoGroup.where()? >> >> On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek <[hidden email]> >> wrote: >> > Hi, >> > yes, I was talking about a Flink bug. I forgot to mention the >> > work-around >> > that Stephan mentioned. >> > >> > On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <[hidden email]> wrote: >> >> >> >> You can also make the KeySelector a static inner class. That should >> >> work >> >> as well. >> >> >> >> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <[hidden email]> >> >> wrote: >> >>> >> >>> Thank you Aljoscha and Fabian for your replies. >> >>> >> >>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm >> >>> afraid this is a bug", I am assuming you are referring to Flink engine >> >>> itself. >> >>> >> >>> @Fabian: thanks for the optimization tip. >> >>> >> >>> This is how I have got it working (with a hack): In my dataset, the >> >>> join >> >>> field/key can be null otherwise .where(fieldName) works and I don't >> >>> get >> >>> not-serializable exception. So I applied a MapFunction to DataSet and >> >>> put a >> >>> dummy value in the join field/key where it was null. Then In the join >> >>> function, I change it back to null. >> >>> >> >>> Best, >> >>> Tarandeep >> >>> >> >>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]> >> >>> wrote: >> >>>> >> >>>> Hi, >> >>>> the problem is that the KeySelector is an anonymous inner class and >> >>>> as >> >>>> such as a reference to the outer RecordFilterer object. Normally, >> >>>> this would >> >>>> be rectified by the closure cleaner but the cleaner is not used in >> >>>> CoGroup.where(). I'm afraid this is a bug. >> >>>> >> >>>> Best, >> >>>> Aljoscha >> >>>> >> >>>> >> >>>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote: >> >>>>> >> >>>>> Hi Tarandeep, >> >>>>> >> >>>>> the exception suggests that Flink tries to serialize RecordsFilterer >> >>>>> as >> >>>>> a user function (this happens via Java Serialization). >> >>>>> I said suggests because the code that uses RecordsFilterer is not >> >>>>> included. >> >>>>> >> >>>>> To me it looks like RecordsFilterer should not be used as a user >> >>>>> function. It is a helper class to construct a DataSet program, so it >> >>>>> should >> >>>>> not be shipped for execution. >> >>>>> You would use such a class as follows: >> >>>>> >> >>>>> DataSet<T> records = ... >> >>>>> DataSet<String> filterIDs = ... >> >>>>> >> >>>>> RecordsFilterer rf = new RecordsFilterer(); >> >>>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, >> >>>>> filterIDs, "myField"); >> >>>>> >> >>>>> Regarding the join code, I would suggest an optimization. >> >>>>> Instead of using CoGroup, I would use distinct and an OuterJoin like >> >>>>> this: >> >>>>> >> >>>>> DataSet<String> distIds = filtereredIds.distinct(); >> >>>>> DataSet<Tuple2<Boolean, T> result = records >> >>>>> .leftOuterJoin(distIds) >> >>>>> .where(KEYSELECTOR) >> >>>>> .equalTo("*") // use full string as key >> >>>>> .with(JOINFUNC) // set Bool to false if right == null, true >> >>>>> otherwise >> >>>>> >> >>>>> Best, Fabian >> >>>>> >> >>>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>: >> >>>>>> >> >>>>>> Hi, >> >>>>>> >> >>>>>> I am getting NoSerializableException in this class- >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> public class RecordsFilterer<T extends GenericRecord> { >> >>>>>> >> >>>>>> public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> >> >>>>>> dataset, DataSet<String> filteredIds, String fieldName) { >> >>>>>> return dataset.coGroup(filteredIds) >> >>>>>> .where(new KeySelector<T, String>() { >> >>>>>> @Override >> >>>>>> public String getKey(T t) throws Exception { >> >>>>>> String s = (String) t.get(fieldName); >> >>>>>> return s != null ? s : >> >>>>>> UUID.randomUUID().toString(); >> >>>>>> } >> >>>>>> }) >> >>>>>> .equalTo((KeySelector<String, String>) s -> s) >> >>>>>> .with(new CoGroupFunction<T, String, >> >>>>>> Tuple2<Boolean,T>>() { >> >>>>>> @Override >> >>>>>> public void coGroup(Iterable<T> records, >> >>>>>> Iterable<String> ids, >> >>>>>> >> >>>>>> Collector<Tuple2<Boolean,T>> >> >>>>>> collector) throws Exception { >> >>>>>> boolean filterFlag = false; >> >>>>>> for (String id : ids) { >> >>>>>> filterFlag = true; >> >>>>>> } >> >>>>>> >> >>>>>> for (T record : records) { >> >>>>>> collector.collect(new >> >>>>>> Tuple2<>(filterFlag, >> >>>>>> record)); >> >>>>>> } >> >>>>>> } >> >>>>>> }); >> >>>>>> >> >>>>>> } >> >>>>>> } >> >>>>>> >> >>>>>> >> >>>>>> What I am trying to do is write a generic code that will join Avro >> >>>>>> records (of different types) with String records and there is a >> >>>>>> match add a >> >>>>>> filter flag. This way I can use the same code for different Avro >> >>>>>> record >> >>>>>> types. But I am getting this exception- >> >>>>>> >> >>>>>> Exception in thread "main" >> >>>>>> org.apache.flink.optimizer.CompilerException: Error translating >> >>>>>> node 'Map >> >>>>>> "Key Extractor" : MAP [[ GlobalProperties >> >>>>>> [partitioning=RANDOM_PARTITIONED] >> >>>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] >> >>>>>> ]]': Could >> >>>>>> not write the user code wrapper class >> >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >> >>>>>> java.io.NotSerializableException: >> >>>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >> >>>>>> at >> >>>>>> >> >>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) >> >>>>>> at >> >>>>>> >> >>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) >> >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>>>>> at >> >>>>>> >> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>>>>> at >> >>>>>> >> >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>>>>> at java.lang.reflect.Method.invoke(Method.java:497) >> >>>>>> at >> >>>>>> >> >>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >> >>>>>> Caused by: >> >>>>>> >> >>>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could >> >>>>>> not write the user code wrapper class >> >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >> >>>>>> java.io.NotSerializableException: RecordsFilterer >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) >> >>>>>> ... 17 more >> >>>>>> Caused by: java.io.NotSerializableException: RecordsFilterer >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> >>>>>> at >> >>>>>> >> >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> >>>>>> at >> >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) >> >>>>>> at >> >>>>>> >> >>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >> >>>>>> ... 19 more >> >>>>>> >> >>>>>> >> >>>>>> Please help me understand why I get this exception and how to fix >> >>>>>> it >> >>>>>> [rewrite code may be?] >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Tarandeep >> >>>>>> >> >>>>>> >> >>>>> >> >>> >> >> >> > |
Free forum by Nabble | Edit this page |