The implementation of the CoGroupFunction is not serializable

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

The implementation of the CoGroupFunction is not serializable

Kaepke, Marc
Hi guys,

I have no errors if I run a Graph.groupReduceOnEdges() in main(String[ ] args). But the same function in its own class throws an exception.

DataSet<Vertex<Double, CustomVertexValue<Double, Double, Double>>> vertexDataSet = graph.groupReduceOnEdges(new EdgesFunctionWithVertexValue<Double, Double, Double, Vertex<Double, CustomVertexValue<Double, Double, Double>>>() {
@Override
public void iterateEdges(Vertex<Double, Double> vertex, Iterable<Edge<Double, Double>> edges, Collector<Vertex<Double, CustomVertexValue<Double, Double, Double>>> out) throws Exception {
CustomVertexValue<Double, Double, Double> customVertexValue = new CustomVertexValue<>(vertex.getValue());
for (Edge edge : edges) {
customVertexValue.addEdge(edge);
}
out.collect(new Vertex<>(vertex.getId(), customVertexValue));
}
}, EdgeDirection.ALL);

——————————
20:07:09,500 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.gelly.HelperClasses.CustomVertexValue is not a valid POJO type
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the CoGroupFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:619)
at org.apache.flink.graph.Graph.groupReduceOnEdges(Graph.java:1005)
at org.apache.flink.gelly.examples.gellyTutorials.TrialAndError$Dummy.<init>(TrialAndError.java:69)
at org.apache.flink.gelly.examples.gellyTutorials.TrialAndError.main(TrialAndError.java:49)
Caused by: java.io.NotSerializableException: org.apache.flink.gelly.examples.gellyTutorials.TrialAndError$Dummy
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 5 more

Process finished with exit code 1
——————————


Do anyone knows a solution for my issue?


Best,
Marc
Reply | Threaded
Open this post in threaded view
|

Re: The implementation of the CoGroupFunction is not serializable

Kaepke, Marc
Hi Greg,

thanks!
Should all inner classes be static?


Best,
Marc

Am 01.05.2017 um 00:21 schrieb Greg Hogan <[hidden email]>:

Hi Marc,

These errors are usually an inner class which should be a static.

Greg


On Apr 30, 2017, at 2:13 PM, Kaepke, Marc <[hidden email]> wrote:

Hi guys,

I have no errors if I run a Graph.groupReduceOnEdges() in main(String[ ] args). But the same function in its own class throws an exception.

DataSet<Vertex<Double, CustomVertexValue<Double, Double, Double>>> vertexDataSet = graph.groupReduceOnEdges(new EdgesFunctionWithVertexValue<Double, Double, Double, Vertex<Double, CustomVertexValue<Double, Double, Double>>>() {
@Override
public void iterateEdges(Vertex<Double, Double> vertex, Iterable<Edge<Double, Double>> edges, Collector<Vertex<Double, CustomVertexValue<Double, Double, Double>>> out) throws Exception {
CustomVertexValue<Double, Double, Double> customVertexValue = new CustomVertexValue<>(vertex.getValue());
for (Edge edge : edges) {
customVertexValue.addEdge(edge);
}
out.collect(new Vertex<>(vertex.getId(), customVertexValue));
}
}, EdgeDirection.ALL);

——————————
20:07:09,500 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.gelly.HelperClasses.CustomVertexValue is not a valid POJO type
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the CoGroupFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:619)
at org.apache.flink.graph.Graph.groupReduceOnEdges(Graph.java:1005)
at org.apache.flink.gelly.examples.gellyTutorials.TrialAndError$Dummy.<init>(TrialAndError.java:69)
at org.apache.flink.gelly.examples.gellyTutorials.TrialAndError.main(TrialAndError.java:49)
Caused by: java.io.NotSerializableException: org.apache.flink.gelly.examples.gellyTutorials.TrialAndError$Dummy
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 5 more

Process finished with exit code 1
——————————


Do anyone knows a solution for my issue?


Best,
Marc