Re: Another serialization error

Posted by Flavio Pompermaier on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Another-serialization-error-tp6952p6956.html

Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio