Another serialization error

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Another serialization error

Flavio Pompermaier
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

rmetzger0
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

Flavio Pompermaier
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

rmetzger0
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

Flavio Pompermaier
I got those exceptions running 3 different types of jobs..I could have tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because it's not public yet) or the dashboard screen via skype while the jobs are running..

On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

Flavio Pompermaier
Ah sorry, I forgot to mention that I don't use any custom kryo serializers..

On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <[hidden email]> wrote:
I got those exceptions running 3 different types of jobs..I could have tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because it's not public yet) or the dashboard screen via skype while the jobs are running..


On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

rmetzger0
The last one is C or A?

How often is it failing (every nth run?) Is it always failing at the same execute() call, or at different ones?
Is it always the exact same exception or is it different ones?
Does the error behave differently depending on the input data?

Sorry for asking so many questions, but these errors can have many causes and just searching the code for potential issues can take a lot of time ;)

On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Ah sorry, I forgot to mention that I don't use any custom kryo serializers..

On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <[hidden email]> wrote:
I got those exceptions running 3 different types of jobs..I could have tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because it's not public yet) or the dashboard screen via skype while the jobs are running..


On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

Flavio Pompermaier
Don't worry Robert,
I know how hard is to debug such errors :)
I hope that maybe the combination of these 3 errors is somehow related...However these are the answers:

  • The job (composed of 16 sub-jobs) fails randomly but, usually, the first subjob after the start restart run successfully
  • In this job I sow both A, B and C (but after changing parallelism)
  • Yes, the error behave differently depending on the input data (actually the number of default parallelism and slotes in the cluster)
One more interesting thing I fixed in my code that could be (maybe?) the of cause of B and C (but not A because that happened after this problem):
I'm reading and writing data from some Parquet-thrift directory (using the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
In one of the 3 jobs I output some dataset in a Parquet-thrift directory after decreasing the parallelism and number of slots from M to N using ds.output().
The first N parquet files were overridden (as expected) but the last M-N were not removed (I was expecting that Parquet thrift directory was managed as a single dir) .
Then, when I've read (in the next job) from that directory I discovered that the job was actually reading all files in that folder (I was convinced that despite the the M-N files
were left in that dir there was some index file, e.g. _metadata, taking care of be the entry point for the files in that folder).
I don't know however if this could be a cause of such errors but I reported it anyway for the sake of completeness and hoping that
this real-life debugging story could be helpful to someone else using Parquet on Flink :)

Thanks for the support,

Flavio


On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <[hidden email]> wrote:
The last one is C or A?

How often is it failing (every nth run?) Is it always failing at the same execute() call, or at different ones?
Is it always the exact same exception or is it different ones?
Does the error behave differently depending on the input data?

Sorry for asking so many questions, but these errors can have many causes and just searching the code for potential issues can take a lot of time ;)

On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Ah sorry, I forgot to mention that I don't use any custom kryo serializers..

On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <[hidden email]> wrote:
I got those exceptions running 3 different types of jobs..I could have tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because it's not public yet) or the dashboard screen via skype while the jobs are running..


On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio






Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

Flavio Pompermaier
I tried to debug my application from Eclipse and I got an infinite recursive call in the TypeExtractor during the analysis of TreeNode (I'm using Flink 1.0.2):

Exception in thread "main" java.lang.StackOverflowError
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736)
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)

Why this doesn't happen on the cluster?



On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <[hidden email]> wrote:
Don't worry Robert,
I know how hard is to debug such errors :)
I hope that maybe the combination of these 3 errors is somehow related...However these are the answers:

  • The job (composed of 16 sub-jobs) fails randomly but, usually, the first subjob after the start restart run successfully
  • In this job I sow both A, B and C (but after changing parallelism)
  • Yes, the error behave differently depending on the input data (actually the number of default parallelism and slotes in the cluster)
One more interesting thing I fixed in my code that could be (maybe?) the of cause of B and C (but not A because that happened after this problem):
I'm reading and writing data from some Parquet-thrift directory (using the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
In one of the 3 jobs I output some dataset in a Parquet-thrift directory after decreasing the parallelism and number of slots from M to N using ds.output().
The first N parquet files were overridden (as expected) but the last M-N were not removed (I was expecting that Parquet thrift directory was managed as a single dir) .
Then, when I've read (in the next job) from that directory I discovered that the job was actually reading all files in that folder (I was convinced that despite the the M-N files
were left in that dir there was some index file, e.g. _metadata, taking care of be the entry point for the files in that folder).
I don't know however if this could be a cause of such errors but I reported it anyway for the sake of completeness and hoping that
this real-life debugging story could be helpful to someone else using Parquet on Flink :)

Thanks for the support,

Flavio


On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <[hidden email]> wrote:
The last one is C or A?

How often is it failing (every nth run?) Is it always failing at the same execute() call, or at different ones?
Is it always the exact same exception or is it different ones?
Does the error behave differently depending on the input data?

Sorry for asking so many questions, but these errors can have many causes and just searching the code for potential issues can take a lot of time ;)

On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Ah sorry, I forgot to mention that I don't use any custom kryo serializers..

On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <[hidden email]> wrote:
I got those exceptions running 3 different types of jobs..I could have tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because it's not public yet) or the dashboard screen via skype while the jobs are running..


On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio







Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

rmetzger0
Are you using 1.0.2 on the cluster as well?

On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <[hidden email]> wrote:
I tried to debug my application from Eclipse and I got an infinite recursive call in the TypeExtractor during the analysis of TreeNode (I'm using Flink 1.0.2):

Exception in thread "main" java.lang.StackOverflowError
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736)
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)

Why this doesn't happen on the cluster?




On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <[hidden email]> wrote:
Don't worry Robert,
I know how hard is to debug such errors :)
I hope that maybe the combination of these 3 errors is somehow related...However these are the answers:

  • The job (composed of 16 sub-jobs) fails randomly but, usually, the first subjob after the start restart run successfully
  • In this job I sow both A, B and C (but after changing parallelism)
  • Yes, the error behave differently depending on the input data (actually the number of default parallelism and slotes in the cluster)
One more interesting thing I fixed in my code that could be (maybe?) the of cause of B and C (but not A because that happened after this problem):
I'm reading and writing data from some Parquet-thrift directory (using the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
In one of the 3 jobs I output some dataset in a Parquet-thrift directory after decreasing the parallelism and number of slots from M to N using ds.output().
The first N parquet files were overridden (as expected) but the last M-N were not removed (I was expecting that Parquet thrift directory was managed as a single dir) .
Then, when I've read (in the next job) from that directory I discovered that the job was actually reading all files in that folder (I was convinced that despite the the M-N files
were left in that dir there was some index file, e.g. _metadata, taking care of be the entry point for the files in that folder).
I don't know however if this could be a cause of such errors but I reported it anyway for the sake of completeness and hoping that
this real-life debugging story could be helpful to someone else using Parquet on Flink :)

Thanks for the support,

Flavio


On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <[hidden email]> wrote:
The last one is C or A?

How often is it failing (every nth run?) Is it always failing at the same execute() call, or at different ones?
Is it always the exact same exception or is it different ones?
Does the error behave differently depending on the input data?

Sorry for asking so many questions, but these errors can have many causes and just searching the code for potential issues can take a lot of time ;)

On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Ah sorry, I forgot to mention that I don't use any custom kryo serializers..

On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <[hidden email]> wrote:
I got those exceptions running 3 different types of jobs..I could have tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because it's not public yet) or the dashboard screen via skype while the jobs are running..


On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio








Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

Flavio Pompermaier
Yes I am

On Tue, May 17, 2016 at 3:45 PM, Robert Metzger <[hidden email]> wrote:
Are you using 1.0.2 on the cluster as well?

On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <[hidden email]> wrote:
I tried to debug my application from Eclipse and I got an infinite recursive call in the TypeExtractor during the analysis of TreeNode (I'm using Flink 1.0.2):

Exception in thread "main" java.lang.StackOverflowError
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736)
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)

Why this doesn't happen on the cluster?




On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <[hidden email]> wrote:
Don't worry Robert,
I know how hard is to debug such errors :)
I hope that maybe the combination of these 3 errors is somehow related...However these are the answers:

  • The job (composed of 16 sub-jobs) fails randomly but, usually, the first subjob after the start restart run successfully
  • In this job I sow both A, B and C (but after changing parallelism)
  • Yes, the error behave differently depending on the input data (actually the number of default parallelism and slotes in the cluster)
One more interesting thing I fixed in my code that could be (maybe?) the of cause of B and C (but not A because that happened after this problem):
I'm reading and writing data from some Parquet-thrift directory (using the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
In one of the 3 jobs I output some dataset in a Parquet-thrift directory after decreasing the parallelism and number of slots from M to N using ds.output().
The first N parquet files were overridden (as expected) but the last M-N were not removed (I was expecting that Parquet thrift directory was managed as a single dir) .
Then, when I've read (in the next job) from that directory I discovered that the job was actually reading all files in that folder (I was convinced that despite the the M-N files
were left in that dir there was some index file, e.g. _metadata, taking care of be the entry point for the files in that folder).
I don't know however if this could be a cause of such errors but I reported it anyway for the sake of completeness and hoping that
this real-life debugging story could be helpful to someone else using Parquet on Flink :)

Thanks for the support,

Flavio


On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <[hidden email]> wrote:
The last one is C or A?

How often is it failing (every nth run?) Is it always failing at the same execute() call, or at different ones?
Is it always the exact same exception or is it different ones?
Does the error behave differently depending on the input data?

Sorry for asking so many questions, but these errors can have many causes and just searching the code for potential issues can take a lot of time ;)

On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Ah sorry, I forgot to mention that I don't use any custom kryo serializers..

On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <[hidden email]> wrote:
I got those exceptions running 3 different types of jobs..I could have tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because it's not public yet) or the dashboard screen via skype while the jobs are running..


On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio









Reply | Threaded
Open this post in threaded view
|

Re: Another serialization error

Flavio Pompermaier
I found that in the cluster I was using a release version of a dependency that has changed..so now I have the error also in the cluster :)

This is caused by the addition of the setParent() method to TreeNode:

    public void setParent(TreeNode<K, V> parent) {
        this.parent = parent;
    }

without that Flink doesn't complain at least..then I'm not sure whether things are working correctly then...


On Tue, May 17, 2016 at 3:53 PM, Flavio Pompermaier <[hidden email]> wrote:
Yes I am


On Tue, May 17, 2016 at 3:45 PM, Robert Metzger <[hidden email]> wrote:
Are you using 1.0.2 on the cluster as well?

On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <[hidden email]> wrote:
I tried to debug my application from Eclipse and I got an infinite recursive call in the TypeExtractor during the analysis of TreeNode (I'm using Flink 1.0.2):

Exception in thread "main" java.lang.StackOverflowError
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736)
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)

Why this doesn't happen on the cluster?




On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <[hidden email]> wrote:
Don't worry Robert,
I know how hard is to debug such errors :)
I hope that maybe the combination of these 3 errors is somehow related...However these are the answers:

  • The job (composed of 16 sub-jobs) fails randomly but, usually, the first subjob after the start restart run successfully
  • In this job I sow both A, B and C (but after changing parallelism)
  • Yes, the error behave differently depending on the input data (actually the number of default parallelism and slotes in the cluster)
One more interesting thing I fixed in my code that could be (maybe?) the of cause of B and C (but not A because that happened after this problem):
I'm reading and writing data from some Parquet-thrift directory (using the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
In one of the 3 jobs I output some dataset in a Parquet-thrift directory after decreasing the parallelism and number of slots from M to N using ds.output().
The first N parquet files were overridden (as expected) but the last M-N were not removed (I was expecting that Parquet thrift directory was managed as a single dir) .
Then, when I've read (in the next job) from that directory I discovered that the job was actually reading all files in that folder (I was convinced that despite the the M-N files
were left in that dir there was some index file, e.g. _metadata, taking care of be the entry point for the files in that folder).
I don't know however if this could be a cause of such errors but I reported it anyway for the sake of completeness and hoping that
this real-life debugging story could be helpful to someone else using Parquet on Flink :)

Thanks for the support,

Flavio


On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <[hidden email]> wrote:
The last one is C or A?

How often is it failing (every nth run?) Is it always failing at the same execute() call, or at different ones?
Is it always the exact same exception or is it different ones?
Does the error behave differently depending on the input data?

Sorry for asking so many questions, but these errors can have many causes and just searching the code for potential issues can take a lot of time ;)

On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Ah sorry, I forgot to mention that I don't use any custom kryo serializers..

On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <[hidden email]> wrote:
I got those exceptions running 3 different types of jobs..I could have tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because it's not public yet) or the dashboard screen via skype while the jobs are running..


On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte 42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Robert,
in this specific case the interested classes are:
  • Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
  • Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement  equals and hashcode)
  • Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TreeNode<K,V> implements Serializable {

private static final long serialVersionUID = 1L;
private int level = 0;

private String uuid;
private K key;
private V value;
@JsonIgnore
private TreeNode<K,V> parent;
private List<TreeNode<K,V>> children;
@JsonIgnore
private HashMap<K, List<TreeNode<K,V>>> lookup;

    public TreeNode(K key, V value) {
    this.level = 0;
    this.key = key;
    this.uuid = UUID.randomUUID().toString();
        this.value = value;
        this.children = new LinkedList<TreeNode<K,V>>();
        List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
        thisAsList.add(this);
        this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
        this.lookup.put(key, thisAsList);
    }
    
    public TreeNode<K,V> addChild(K key, V value) {
    TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
    childNode.level = level +1;
        childNode.parent = this;
        childNode.lookup = lookup;
        childNode.uuid = UUID.randomUUID().toString();
        this.children.add(childNode);
        List<TreeNode<K, V>> l = lookup.get(childNode.key);
        if(l==null){
        l = new ArrayList<TreeNode<K,V>>();
        lookup.put(childNode.key, l);
        }
        l.add(childNode);
        return childNode;
    }
    
    public boolean isLeaf() {
return children.isEmpty() ;
}
public int getLevel() {
return level;
}
public TreeNode<K,V> getParent() {
return parent;
}
public V getValue() {
return value;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public List<TreeNode<K,V>> getChildren() {
return children;
}
public List<TreeNode<K, V>> getNodesByKey(K key) {
return lookup.get(key);
}
public K getKey() {
return key;
}
public List<TreeNode<K,V>> getLeafs() {
List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
getLeafs(ret);
return ret;
}
private void getLeafs(List<TreeNode<K, V>> ret) {
if(children.isEmpty())
ret.add(this);
for (TreeNode<K, V> child : children) {
child.getLeafs(ret);
}
}

@Override
public String toString() {
return toString(true);
}
public String toString(boolean withChildren) {
if(key==null)
return super.toString();
StringBuffer ret = new StringBuffer();
for (int i = 0; i < level; i++) {
ret.append(" >");
}
ret.append(" " +key.toString());
if(withChildren){
for (TreeNode<K, V> child : children) {
ret.append("\n").append(child.toString());
}
}
return ret.toString();
}

public void setValue(V value) {
this.value = value;
}

public void remove(List<TreeNode<K, V>> nodes) {
for (TreeNode<K, V> n : nodes) {
removeChildren(n);
}
for (TreeNode<K, V> n : nodes) {
TreeNode<K, V> parent = n.getParent();
if(parent==null)
continue;
parent.children.remove(n);
}
}

private void removeChildren(TreeNode<K, V> node) {
lookup.remove(node.getUuid());
if(node.children.isEmpty())
return;
Iterator<TreeNode<K, V>> it = node.children.iterator();
while (it.hasNext()) {
TreeNode<K, V> child = (TreeNode<K, V>) it.next();
removeChildren(child);
it.remove();
}
}

public void clear() {
    this.key = null;
        this.value = null;
        this.uuid = null;
        for (TreeNode<K, V> child : children) {
        child.clear();
}
        this.children.clear();
        this.lookup.clear();
}

public TreeNode<K, V> getNodeById(K key, String uuid) {
List<TreeNode<K, V>> nodes = getNodesByKey(key);
for (TreeNode<K, V> treeNode : nodes) {
if(uuid.equals(treeNode.getUuid()))
return treeNode;
}
return null;
}

public HashMap<K, List<TreeNode<K, V>>> getLookup() {
return lookup;
}

public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
this.lookup = lookup;
}

public void setLevel(int level) {
this.level = level;
}

public void setKey(K key) {
this.key = key;
}

public void setParent(TreeNode<K, V> parent) {
this.parent = parent;
}

public void setChildren(List<TreeNode<K, V>> children) {
this.children = children;
}
}

On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

which datatype are you using?

On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...
I hope one of them could help in finding the source of the problem..This time the exception is:

An error occurred while reading the next record.
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
     at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
     at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
     at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
     at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
     at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
     at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
     at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130

Could this error be cause by a missing implementation of hashCode() and equals()?

Thanks in advance,
Flavio