NotSerializableException

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

NotSerializableException

Tarandeep Singh
Hi,

I am getting NoSerializableException in this class-


public class RecordsFilterer<T extends GenericRecord> {

public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, DataSet<String> filteredIds, String fieldName) {
return dataset.coGroup(filteredIds)
.where(new KeySelector<T, String>() {
@Override
public String getKey(T t) throws Exception {
String s = (String) t.get(fieldName);
return s != null ? s : UUID.randomUUID().toString();
}
})
.equalTo((KeySelector<String, String>) s -> s)
.with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
@Override
public void coGroup(Iterable<T> records, Iterable<String> ids,
Collector<Tuple2<Boolean,T>> collector) throws Exception {
boolean filterFlag = false;
for (String id : ids) {
filterFlag = true;
}

for (T record : records) {
collector.collect(new Tuple2<>(filterFlag, record));
}
}
});

}
}

What I am trying to do is write a generic code that will join Avro records (of different types) with String records and there is a match add a filter flag. This way I can use the same code for different Avro record types. But I am getting this exception-

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: RecordsFilterer
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
    ... 17 more
Caused by: java.io.NotSerializableException: RecordsFilterer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
    ... 19 more


Please help me understand why I get this exception and how to fix it [rewrite code may be?]

Thanks,
Tarandeep


Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException

Fabian Hueske-2
Hi Tarandeep,

the exception suggests that Flink tries to serialize RecordsFilterer as a user function (this happens via Java Serialization).
I said suggests because the code that uses RecordsFilterer is not included.

To me it looks like RecordsFilterer should not be used as a user function. It is a helper class to construct a DataSet program, so it should not be shipped for execution.
You would use such a class as follows:

DataSet<T> records = ...
DataSet<String> filterIDs = ...

RecordsFilterer rf = new RecordsFilterer();
DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs, "myField");

Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like this:

DataSet<String> distIds = filtereredIds.distinct();
DataSet<Tuple2<Boolean, T> result = records
  .leftOuterJoin(distIds)
  .where(KEYSELECTOR)
  .equalTo("*") // use full string as key
  .with(JOINFUNC) // set Bool to false if right == null, true otherwise

Best, Fabian

2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
Hi,

I am getting NoSerializableException in this class-


public class RecordsFilterer<T extends GenericRecord> {

public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, DataSet<String> filteredIds, String fieldName) {
return dataset.coGroup(filteredIds)
.where(new KeySelector<T, String>() {
@Override
public String getKey(T t) throws Exception {
String s = (String) t.get(fieldName);
return s != null ? s : UUID.randomUUID().toString();
}
})
.equalTo((KeySelector<String, String>) s -> s)
.with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
@Override
public void coGroup(Iterable<T> records, Iterable<String> ids,
Collector<Tuple2<Boolean,T>> collector) throws Exception {
boolean filterFlag = false;
for (String id : ids) {
filterFlag = true;
}

for (T record : records) {
collector.collect(new Tuple2<>(filterFlag, record));
}
}
});

}
}

What I am trying to do is write a generic code that will join Avro records (of different types) with String records and there is a match add a filter flag. This way I can use the same code for different Avro record types. But I am getting this exception-

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: RecordsFilterer
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
    ... 17 more
Caused by: java.io.NotSerializableException: RecordsFilterer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
    ... 19 more


Please help me understand why I get this exception and how to fix it [rewrite code may be?]

Thanks,
Tarandeep



Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException

Aljoscha Krettek
Hi,
the problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer RecordFilterer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). I'm afraid this is a bug.

Best,
Aljoscha


On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote:
Hi Tarandeep,

the exception suggests that Flink tries to serialize RecordsFilterer as a user function (this happens via Java Serialization).
I said suggests because the code that uses RecordsFilterer is not included.

To me it looks like RecordsFilterer should not be used as a user function. It is a helper class to construct a DataSet program, so it should not be shipped for execution.
You would use such a class as follows:

DataSet<T> records = ...
DataSet<String> filterIDs = ...

RecordsFilterer rf = new RecordsFilterer();
DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs, "myField");

Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like this:

DataSet<String> distIds = filtereredIds.distinct();
DataSet<Tuple2<Boolean, T> result = records
  .leftOuterJoin(distIds)
  .where(KEYSELECTOR)
  .equalTo("*") // use full string as key
  .with(JOINFUNC) // set Bool to false if right == null, true otherwise

Best, Fabian

2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
Hi,

I am getting NoSerializableException in this class-


public class RecordsFilterer<T extends GenericRecord> {

public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, DataSet<String> filteredIds, String fieldName) {
return dataset.coGroup(filteredIds)
.where(new KeySelector<T, String>() {
@Override
public String getKey(T t) throws Exception {
String s = (String) t.get(fieldName);
return s != null ? s : UUID.randomUUID().toString();
}
})
.equalTo((KeySelector<String, String>) s -> s)
.with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
@Override
public void coGroup(Iterable<T> records, Iterable<String> ids,
Collector<Tuple2<Boolean,T>> collector) throws Exception {
boolean filterFlag = false;
for (String id : ids) {
filterFlag = true;
}

for (T record : records) {
collector.collect(new Tuple2<>(filterFlag, record));
}
}
});

}
}

What I am trying to do is write a generic code that will join Avro records (of different types) with String records and there is a match add a filter flag. This way I can use the same code for different Avro record types. But I am getting this exception-

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: RecordsFilterer
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
    ... 17 more
Caused by: java.io.NotSerializableException: RecordsFilterer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
    ... 19 more


Please help me understand why I get this exception and how to fix it [rewrite code may be?]

Thanks,
Tarandeep



Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException

Tarandeep Singh
Thank you Aljoscha and Fabian for your replies.

@Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm afraid this is a bug", I am assuming you are referring to Flink engine itself.

@Fabian: thanks for the optimization tip.

This is how I have got it working (with a hack): In my dataset, the join field/key can be null otherwise .where(fieldName) works and I don't get not-serializable exception. So I applied a MapFunction to DataSet and put a dummy value in the join field/key where it was null. Then In the join function, I change it back to null.

Best,
Tarandeep

On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer RecordFilterer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). I'm afraid this is a bug.

Best,
Aljoscha


On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote:
Hi Tarandeep,

the exception suggests that Flink tries to serialize RecordsFilterer as a user function (this happens via Java Serialization).
I said suggests because the code that uses RecordsFilterer is not included.

To me it looks like RecordsFilterer should not be used as a user function. It is a helper class to construct a DataSet program, so it should not be shipped for execution.
You would use such a class as follows:

DataSet<T> records = ...
DataSet<String> filterIDs = ...

RecordsFilterer rf = new RecordsFilterer();
DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs, "myField");

Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like this:

DataSet<String> distIds = filtereredIds.distinct();
DataSet<Tuple2<Boolean, T> result = records
  .leftOuterJoin(distIds)
  .where(KEYSELECTOR)
  .equalTo("*") // use full string as key
  .with(JOINFUNC) // set Bool to false if right == null, true otherwise

Best, Fabian

2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
Hi,

I am getting NoSerializableException in this class-


public class RecordsFilterer<T extends GenericRecord> {

public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, DataSet<String> filteredIds, String fieldName) {
return dataset.coGroup(filteredIds)
.where(new KeySelector<T, String>() {
@Override
public String getKey(T t) throws Exception {
String s = (String) t.get(fieldName);
return s != null ? s : UUID.randomUUID().toString();
}
})
.equalTo((KeySelector<String, String>) s -> s)
.with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
@Override
public void coGroup(Iterable<T> records, Iterable<String> ids,
Collector<Tuple2<Boolean,T>> collector) throws Exception {
boolean filterFlag = false;
for (String id : ids) {
filterFlag = true;
}

for (T record : records) {
collector.collect(new Tuple2<>(filterFlag, record));
}
}
});

}
}

What I am trying to do is write a generic code that will join Avro records (of different types) with String records and there is a match add a filter flag. This way I can use the same code for different Avro record types. But I am getting this exception-

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: RecordsFilterer
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
    ... 17 more
Caused by: java.io.NotSerializableException: RecordsFilterer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
    ... 19 more


Please help me understand why I get this exception and how to fix it [rewrite code may be?]

Thanks,
Tarandeep




Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException

Stephan Ewen
You can also make the KeySelector a static inner class. That should work as well.

On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <[hidden email]> wrote:
Thank you Aljoscha and Fabian for your replies.

@Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm afraid this is a bug", I am assuming you are referring to Flink engine itself.

@Fabian: thanks for the optimization tip.

This is how I have got it working (with a hack): In my dataset, the join field/key can be null otherwise .where(fieldName) works and I don't get not-serializable exception. So I applied a MapFunction to DataSet and put a dummy value in the join field/key where it was null. Then In the join function, I change it back to null.

Best,
Tarandeep

On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer RecordFilterer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). I'm afraid this is a bug.

Best,
Aljoscha


On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote:
Hi Tarandeep,

the exception suggests that Flink tries to serialize RecordsFilterer as a user function (this happens via Java Serialization).
I said suggests because the code that uses RecordsFilterer is not included.

To me it looks like RecordsFilterer should not be used as a user function. It is a helper class to construct a DataSet program, so it should not be shipped for execution.
You would use such a class as follows:

DataSet<T> records = ...
DataSet<String> filterIDs = ...

RecordsFilterer rf = new RecordsFilterer();
DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs, "myField");

Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like this:

DataSet<String> distIds = filtereredIds.distinct();
DataSet<Tuple2<Boolean, T> result = records
  .leftOuterJoin(distIds)
  .where(KEYSELECTOR)
  .equalTo("*") // use full string as key
  .with(JOINFUNC) // set Bool to false if right == null, true otherwise

Best, Fabian

2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
Hi,

I am getting NoSerializableException in this class-


public class RecordsFilterer<T extends GenericRecord> {

public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, DataSet<String> filteredIds, String fieldName) {
return dataset.coGroup(filteredIds)
.where(new KeySelector<T, String>() {
@Override
public String getKey(T t) throws Exception {
String s = (String) t.get(fieldName);
return s != null ? s : UUID.randomUUID().toString();
}
})
.equalTo((KeySelector<String, String>) s -> s)
.with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
@Override
public void coGroup(Iterable<T> records, Iterable<String> ids,
Collector<Tuple2<Boolean,T>> collector) throws Exception {
boolean filterFlag = false;
for (String id : ids) {
filterFlag = true;
}

for (T record : records) {
collector.collect(new Tuple2<>(filterFlag, record));
}
}
});

}
}

What I am trying to do is write a generic code that will join Avro records (of different types) with String records and there is a match add a filter flag. This way I can use the same code for different Avro record types. But I am getting this exception-

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: RecordsFilterer
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
    ... 17 more
Caused by: java.io.NotSerializableException: RecordsFilterer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
    ... 19 more


Please help me understand why I get this exception and how to fix it [rewrite code may be?]

Thanks,
Tarandeep





Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException

Aljoscha Krettek
Hi,
yes, I was talking about a Flink bug. I forgot to mention the work-around that Stephan mentioned.

On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <[hidden email]> wrote:
You can also make the KeySelector a static inner class. That should work as well.

On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <[hidden email]> wrote:
Thank you Aljoscha and Fabian for your replies.

@Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm afraid this is a bug", I am assuming you are referring to Flink engine itself.

@Fabian: thanks for the optimization tip.

This is how I have got it working (with a hack): In my dataset, the join field/key can be null otherwise .where(fieldName) works and I don't get not-serializable exception. So I applied a MapFunction to DataSet and put a dummy value in the join field/key where it was null. Then In the join function, I change it back to null.

Best,
Tarandeep

On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer RecordFilterer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). I'm afraid this is a bug.

Best,
Aljoscha


On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote:
Hi Tarandeep,

the exception suggests that Flink tries to serialize RecordsFilterer as a user function (this happens via Java Serialization).
I said suggests because the code that uses RecordsFilterer is not included.

To me it looks like RecordsFilterer should not be used as a user function. It is a helper class to construct a DataSet program, so it should not be shipped for execution.
You would use such a class as follows:

DataSet<T> records = ...
DataSet<String> filterIDs = ...

RecordsFilterer rf = new RecordsFilterer();
DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs, "myField");

Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like this:

DataSet<String> distIds = filtereredIds.distinct();
DataSet<Tuple2<Boolean, T> result = records
  .leftOuterJoin(distIds)
  .where(KEYSELECTOR)
  .equalTo("*") // use full string as key
  .with(JOINFUNC) // set Bool to false if right == null, true otherwise

Best, Fabian

2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
Hi,

I am getting NoSerializableException in this class-


public class RecordsFilterer<T extends GenericRecord> {

public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, DataSet<String> filteredIds, String fieldName) {
return dataset.coGroup(filteredIds)
.where(new KeySelector<T, String>() {
@Override
public String getKey(T t) throws Exception {
String s = (String) t.get(fieldName);
return s != null ? s : UUID.randomUUID().toString();
}
})
.equalTo((KeySelector<String, String>) s -> s)
.with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
@Override
public void coGroup(Iterable<T> records, Iterable<String> ids,
Collector<Tuple2<Boolean,T>> collector) throws Exception {
boolean filterFlag = false;
for (String id : ids) {
filterFlag = true;
}

for (T record : records) {
collector.collect(new Tuple2<>(filterFlag, record));
}
}
});

}
}

What I am trying to do is write a generic code that will join Avro records (of different types) with String records and there is a match add a filter flag. This way I can use the same code for different Avro record types. But I am getting this exception-

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
    at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: RecordsFilterer
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
    at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
    ... 17 more
Caused by: java.io.NotSerializableException: RecordsFilterer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
    at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
    ... 19 more


Please help me understand why I get this exception and how to fix it [rewrite code may be?]

Thanks,
Tarandeep





Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException

Maximilian Michels
Is there an issue or a fix for proper use of the ClojureCleaner in
CoGroup.where()?

On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek <[hidden email]> wrote:

> Hi,
> yes, I was talking about a Flink bug. I forgot to mention the work-around
> that Stephan mentioned.
>
> On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <[hidden email]> wrote:
>>
>> You can also make the KeySelector a static inner class. That should work
>> as well.
>>
>> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <[hidden email]>
>> wrote:
>>>
>>> Thank you Aljoscha and Fabian for your replies.
>>>
>>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
>>> afraid this is a bug", I am assuming you are referring to Flink engine
>>> itself.
>>>
>>> @Fabian: thanks for the optimization tip.
>>>
>>> This is how I have got it working (with a hack): In my dataset, the join
>>> field/key can be null otherwise .where(fieldName) works and I don't get
>>> not-serializable exception. So I applied a MapFunction to DataSet and put a
>>> dummy value in the join field/key where it was null. Then In the join
>>> function, I change it back to null.
>>>
>>> Best,
>>> Tarandeep
>>>
>>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]>
>>> wrote:
>>>>
>>>> Hi,
>>>> the problem is that the KeySelector is an anonymous inner class and as
>>>> such as a reference to the outer RecordFilterer object. Normally, this would
>>>> be rectified by the closure cleaner but the cleaner is not used in
>>>> CoGroup.where(). I'm afraid this is a bug.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote:
>>>>>
>>>>> Hi Tarandeep,
>>>>>
>>>>> the exception suggests that Flink tries to serialize RecordsFilterer as
>>>>> a user function (this happens via Java Serialization).
>>>>> I said suggests because the code that uses RecordsFilterer is not
>>>>> included.
>>>>>
>>>>> To me it looks like RecordsFilterer should not be used as a user
>>>>> function. It is a helper class to construct a DataSet program, so it should
>>>>> not be shipped for execution.
>>>>> You would use such a class as follows:
>>>>>
>>>>> DataSet<T> records = ...
>>>>> DataSet<String> filterIDs = ...
>>>>>
>>>>> RecordsFilterer rf = new RecordsFilterer();
>>>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records,
>>>>> filterIDs, "myField");
>>>>>
>>>>> Regarding the join code, I would suggest an optimization.
>>>>> Instead of using CoGroup, I would use distinct and an OuterJoin like
>>>>> this:
>>>>>
>>>>> DataSet<String> distIds = filtereredIds.distinct();
>>>>> DataSet<Tuple2<Boolean, T> result = records
>>>>>   .leftOuterJoin(distIds)
>>>>>   .where(KEYSELECTOR)
>>>>>   .equalTo("*") // use full string as key
>>>>>   .with(JOINFUNC) // set Bool to false if right == null, true otherwise
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am getting NoSerializableException in this class-
>>>>>>
>>>>>> 
>>>>>>
>>>>>> public class RecordsFilterer<T extends GenericRecord> {
>>>>>>
>>>>>>     public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T>
>>>>>> dataset, DataSet<String> filteredIds, String fieldName) {
>>>>>>         return dataset.coGroup(filteredIds)
>>>>>>                 .where(new KeySelector<T, String>() {
>>>>>>                     @Override
>>>>>>                     public String getKey(T t) throws Exception {
>>>>>>                         String s = (String) t.get(fieldName);
>>>>>>                         return s != null ? s :
>>>>>> UUID.randomUUID().toString();
>>>>>>                     }
>>>>>>                 })
>>>>>>                 .equalTo((KeySelector<String, String>) s -> s)
>>>>>>                 .with(new CoGroupFunction<T, String,
>>>>>> Tuple2<Boolean,T>>() {
>>>>>>                     @Override
>>>>>>                     public void coGroup(Iterable<T> records,
>>>>>> Iterable<String> ids,
>>>>>>                                         Collector<Tuple2<Boolean,T>>
>>>>>> collector) throws Exception {
>>>>>>                         boolean filterFlag = false;
>>>>>>                         for (String id : ids) {
>>>>>>                             filterFlag = true;
>>>>>>                         }
>>>>>>
>>>>>>                         for (T record : records) {
>>>>>>                             collector.collect(new Tuple2<>(filterFlag,
>>>>>> record));
>>>>>>                         }
>>>>>>                     }
>>>>>>                 });
>>>>>>
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> What I am trying to do is write a generic code that will join Avro
>>>>>> records (of different types) with String records and there is a match add a
>>>>>> filter flag. This way I can use the same code for different Avro record
>>>>>> types. But I am getting this exception-
>>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.optimizer.CompilerException: Error translating node 'Map
>>>>>> "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED]
>>>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could
>>>>>> not write the user code wrapper class
>>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>>>>> java.io.NotSerializableException:
>>>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
>>>>>>     at
>>>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
>>>>>>     at
>>>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
>>>>>>     at
>>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>>>     at
>>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
>>>>>>     at
>>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>     at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>     at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>>     at
>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>>>> Caused by:
>>>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could
>>>>>> not write the user code wrapper class
>>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>>>>> java.io.NotSerializableException: RecordsFilterer
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
>>>>>>     ... 17 more
>>>>>> Caused by: java.io.NotSerializableException: RecordsFilterer
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>     at
>>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>>>>>>     at
>>>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>>>>>>     ... 19 more
>>>>>>
>>>>>>
>>>>>> Please help me understand why I get this exception and how to fix it
>>>>>> [rewrite code may be?]
>>>>>>
>>>>>> Thanks,
>>>>>> Tarandeep
>>>>>>
>>>>>>
>>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException

Aljoscha Krettek
Nope, I think there is neither a fix nor an open issue for this right now.

On Mon, 13 Jun 2016 at 11:31 Maximilian Michels <[hidden email]> wrote:
Is there an issue or a fix for proper use of the ClojureCleaner in
CoGroup.where()?

On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek <[hidden email]> wrote:
> Hi,
> yes, I was talking about a Flink bug. I forgot to mention the work-around
> that Stephan mentioned.
>
> On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <[hidden email]> wrote:
>>
>> You can also make the KeySelector a static inner class. That should work
>> as well.
>>
>> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <[hidden email]>
>> wrote:
>>>
>>> Thank you Aljoscha and Fabian for your replies.
>>>
>>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
>>> afraid this is a bug", I am assuming you are referring to Flink engine
>>> itself.
>>>
>>> @Fabian: thanks for the optimization tip.
>>>
>>> This is how I have got it working (with a hack): In my dataset, the join
>>> field/key can be null otherwise .where(fieldName) works and I don't get
>>> not-serializable exception. So I applied a MapFunction to DataSet and put a
>>> dummy value in the join field/key where it was null. Then In the join
>>> function, I change it back to null.
>>>
>>> Best,
>>> Tarandeep
>>>
>>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]>
>>> wrote:
>>>>
>>>> Hi,
>>>> the problem is that the KeySelector is an anonymous inner class and as
>>>> such as a reference to the outer RecordFilterer object. Normally, this would
>>>> be rectified by the closure cleaner but the cleaner is not used in
>>>> CoGroup.where(). I'm afraid this is a bug.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote:
>>>>>
>>>>> Hi Tarandeep,
>>>>>
>>>>> the exception suggests that Flink tries to serialize RecordsFilterer as
>>>>> a user function (this happens via Java Serialization).
>>>>> I said suggests because the code that uses RecordsFilterer is not
>>>>> included.
>>>>>
>>>>> To me it looks like RecordsFilterer should not be used as a user
>>>>> function. It is a helper class to construct a DataSet program, so it should
>>>>> not be shipped for execution.
>>>>> You would use such a class as follows:
>>>>>
>>>>> DataSet<T> records = ...
>>>>> DataSet<String> filterIDs = ...
>>>>>
>>>>> RecordsFilterer rf = new RecordsFilterer();
>>>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records,
>>>>> filterIDs, "myField");
>>>>>
>>>>> Regarding the join code, I would suggest an optimization.
>>>>> Instead of using CoGroup, I would use distinct and an OuterJoin like
>>>>> this:
>>>>>
>>>>> DataSet<String> distIds = filtereredIds.distinct();
>>>>> DataSet<Tuple2<Boolean, T> result = records
>>>>>   .leftOuterJoin(distIds)
>>>>>   .where(KEYSELECTOR)
>>>>>   .equalTo("*") // use full string as key
>>>>>   .with(JOINFUNC) // set Bool to false if right == null, true otherwise
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am getting NoSerializableException in this class-
>>>>>>
>>>>>> 
>>>>>>
>>>>>> public class RecordsFilterer<T extends GenericRecord> {
>>>>>>
>>>>>>     public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T>
>>>>>> dataset, DataSet<String> filteredIds, String fieldName) {
>>>>>>         return dataset.coGroup(filteredIds)
>>>>>>                 .where(new KeySelector<T, String>() {
>>>>>>                     @Override
>>>>>>                     public String getKey(T t) throws Exception {
>>>>>>                         String s = (String) t.get(fieldName);
>>>>>>                         return s != null ? s :
>>>>>> UUID.randomUUID().toString();
>>>>>>                     }
>>>>>>                 })
>>>>>>                 .equalTo((KeySelector<String, String>) s -> s)
>>>>>>                 .with(new CoGroupFunction<T, String,
>>>>>> Tuple2<Boolean,T>>() {
>>>>>>                     @Override
>>>>>>                     public void coGroup(Iterable<T> records,
>>>>>> Iterable<String> ids,
>>>>>>                                         Collector<Tuple2<Boolean,T>>
>>>>>> collector) throws Exception {
>>>>>>                         boolean filterFlag = false;
>>>>>>                         for (String id : ids) {
>>>>>>                             filterFlag = true;
>>>>>>                         }
>>>>>>
>>>>>>                         for (T record : records) {
>>>>>>                             collector.collect(new Tuple2<>(filterFlag,
>>>>>> record));
>>>>>>                         }
>>>>>>                     }
>>>>>>                 });
>>>>>>
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> What I am trying to do is write a generic code that will join Avro
>>>>>> records (of different types) with String records and there is a match add a
>>>>>> filter flag. This way I can use the same code for different Avro record
>>>>>> types. But I am getting this exception-
>>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.optimizer.CompilerException: Error translating node 'Map
>>>>>> "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED]
>>>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could
>>>>>> not write the user code wrapper class
>>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>>>>> java.io.NotSerializableException:
>>>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
>>>>>>     at
>>>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
>>>>>>     at
>>>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
>>>>>>     at
>>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>>>     at
>>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
>>>>>>     at
>>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>     at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>     at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>>     at
>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>>>> Caused by:
>>>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could
>>>>>> not write the user code wrapper class
>>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>>>>> java.io.NotSerializableException: RecordsFilterer
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
>>>>>>     at
>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
>>>>>>     ... 17 more
>>>>>> Caused by: java.io.NotSerializableException: RecordsFilterer
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>     at
>>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>>>>>>     at
>>>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>>>>>>     ... 19 more
>>>>>>
>>>>>>
>>>>>> Please help me understand why I get this exception and how to fix it
>>>>>> [rewrite code may be?]
>>>>>>
>>>>>> Thanks,
>>>>>> Tarandeep
>>>>>>
>>>>>>
>>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException

Ufuk Celebi
Here's the issue https://issues.apache.org/jira/browse/FLINK-4078

On Mon, Jun 13, 2016 at 12:27 PM, Aljoscha Krettek <[hidden email]> wrote:

> Nope, I think there is neither a fix nor an open issue for this right now.
>
> On Mon, 13 Jun 2016 at 11:31 Maximilian Michels <[hidden email]> wrote:
>>
>> Is there an issue or a fix for proper use of the ClojureCleaner in
>> CoGroup.where()?
>>
>> On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek <[hidden email]>
>> wrote:
>> > Hi,
>> > yes, I was talking about a Flink bug. I forgot to mention the
>> > work-around
>> > that Stephan mentioned.
>> >
>> > On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <[hidden email]> wrote:
>> >>
>> >> You can also make the KeySelector a static inner class. That should
>> >> work
>> >> as well.
>> >>
>> >> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Thank you Aljoscha and Fabian for your replies.
>> >>>
>> >>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
>> >>> afraid this is a bug", I am assuming you are referring to Flink engine
>> >>> itself.
>> >>>
>> >>> @Fabian: thanks for the optimization tip.
>> >>>
>> >>> This is how I have got it working (with a hack): In my dataset, the
>> >>> join
>> >>> field/key can be null otherwise .where(fieldName) works and I don't
>> >>> get
>> >>> not-serializable exception. So I applied a MapFunction to DataSet and
>> >>> put a
>> >>> dummy value in the join field/key where it was null. Then In the join
>> >>> function, I change it back to null.
>> >>>
>> >>> Best,
>> >>> Tarandeep
>> >>>
>> >>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <[hidden email]>
>> >>> wrote:
>> >>>>
>> >>>> Hi,
>> >>>> the problem is that the KeySelector is an anonymous inner class and
>> >>>> as
>> >>>> such as a reference to the outer RecordFilterer object. Normally,
>> >>>> this would
>> >>>> be rectified by the closure cleaner but the cleaner is not used in
>> >>>> CoGroup.where(). I'm afraid this is a bug.
>> >>>>
>> >>>> Best,
>> >>>> Aljoscha
>> >>>>
>> >>>>
>> >>>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <[hidden email]> wrote:
>> >>>>>
>> >>>>> Hi Tarandeep,
>> >>>>>
>> >>>>> the exception suggests that Flink tries to serialize RecordsFilterer
>> >>>>> as
>> >>>>> a user function (this happens via Java Serialization).
>> >>>>> I said suggests because the code that uses RecordsFilterer is not
>> >>>>> included.
>> >>>>>
>> >>>>> To me it looks like RecordsFilterer should not be used as a user
>> >>>>> function. It is a helper class to construct a DataSet program, so it
>> >>>>> should
>> >>>>> not be shipped for execution.
>> >>>>> You would use such a class as follows:
>> >>>>>
>> >>>>> DataSet<T> records = ...
>> >>>>> DataSet<String> filterIDs = ...
>> >>>>>
>> >>>>> RecordsFilterer rf = new RecordsFilterer();
>> >>>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records,
>> >>>>> filterIDs, "myField");
>> >>>>>
>> >>>>> Regarding the join code, I would suggest an optimization.
>> >>>>> Instead of using CoGroup, I would use distinct and an OuterJoin like
>> >>>>> this:
>> >>>>>
>> >>>>> DataSet<String> distIds = filtereredIds.distinct();
>> >>>>> DataSet<Tuple2<Boolean, T> result = records
>> >>>>>   .leftOuterJoin(distIds)
>> >>>>>   .where(KEYSELECTOR)
>> >>>>>   .equalTo("*") // use full string as key
>> >>>>>   .with(JOINFUNC) // set Bool to false if right == null, true
>> >>>>> otherwise
>> >>>>>
>> >>>>> Best, Fabian
>> >>>>>
>> >>>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <[hidden email]>:
>> >>>>>>
>> >>>>>> Hi,
>> >>>>>>
>> >>>>>> I am getting NoSerializableException in this class-
>> >>>>>>
>> >>>>>> 
>> >>>>>>
>> >>>>>> public class RecordsFilterer<T extends GenericRecord> {
>> >>>>>>
>> >>>>>>     public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T>
>> >>>>>> dataset, DataSet<String> filteredIds, String fieldName) {
>> >>>>>>         return dataset.coGroup(filteredIds)
>> >>>>>>                 .where(new KeySelector<T, String>() {
>> >>>>>>                     @Override
>> >>>>>>                     public String getKey(T t) throws Exception {
>> >>>>>>                         String s = (String) t.get(fieldName);
>> >>>>>>                         return s != null ? s :
>> >>>>>> UUID.randomUUID().toString();
>> >>>>>>                     }
>> >>>>>>                 })
>> >>>>>>                 .equalTo((KeySelector<String, String>) s -> s)
>> >>>>>>                 .with(new CoGroupFunction<T, String,
>> >>>>>> Tuple2<Boolean,T>>() {
>> >>>>>>                     @Override
>> >>>>>>                     public void coGroup(Iterable<T> records,
>> >>>>>> Iterable<String> ids,
>> >>>>>>
>> >>>>>> Collector<Tuple2<Boolean,T>>
>> >>>>>> collector) throws Exception {
>> >>>>>>                         boolean filterFlag = false;
>> >>>>>>                         for (String id : ids) {
>> >>>>>>                             filterFlag = true;
>> >>>>>>                         }
>> >>>>>>
>> >>>>>>                         for (T record : records) {
>> >>>>>>                             collector.collect(new
>> >>>>>> Tuple2<>(filterFlag,
>> >>>>>> record));
>> >>>>>>                         }
>> >>>>>>                     }
>> >>>>>>                 });
>> >>>>>>
>> >>>>>>     }
>> >>>>>> }
>> >>>>>>
>> >>>>>>
>> >>>>>> What I am trying to do is write a generic code that will join Avro
>> >>>>>> records (of different types) with String records and there is a
>> >>>>>> match add a
>> >>>>>> filter flag. This way I can use the same code for different Avro
>> >>>>>> record
>> >>>>>> types. But I am getting this exception-
>> >>>>>>
>> >>>>>> Exception in thread "main"
>> >>>>>> org.apache.flink.optimizer.CompilerException: Error translating
>> >>>>>> node 'Map
>> >>>>>> "Key Extractor" : MAP [[ GlobalProperties
>> >>>>>> [partitioning=RANDOM_PARTITIONED]
>> >>>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null]
>> >>>>>> ]]': Could
>> >>>>>> not write the user code wrapper class
>> >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>> >>>>>> java.io.NotSerializableException:
>> >>>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
>> >>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>     at java.lang.reflect.Method.invoke(Method.java:497)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> >>>>>> Caused by:
>> >>>>>>
>> >>>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could
>> >>>>>> not write the user code wrapper class
>> >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>> >>>>>> java.io.NotSerializableException: RecordsFilterer
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
>> >>>>>>     ... 17 more
>> >>>>>> Caused by: java.io.NotSerializableException: RecordsFilterer
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >>>>>>     at
>> >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
>> >>>>>>     at
>> >>>>>>
>> >>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>> >>>>>>     ... 19 more
>> >>>>>>
>> >>>>>>
>> >>>>>> Please help me understand why I get this exception and how to fix
>> >>>>>> it
>> >>>>>> [rewrite code may be?]
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Tarandeep
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>
>> >>
>> >