Hi Robert,in this specific case the interested classes are:
- Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode)
- Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement equals and hashcode)
- Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following:import java.io.Serializable;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.UUID;import com.fasterxml.jackson.annotation.JsonIgnore;public class TreeNode<K,V> implements Serializable {private static final long serialVersionUID = 1L;private int level = 0;private String uuid;private K key;private V value;@JsonIgnoreprivate TreeNode<K,V> parent;private List<TreeNode<K,V>> children;@JsonIgnoreprivate HashMap<K, List<TreeNode<K,V>>> lookup;public TreeNode(K key, V value) {this.level = 0;this.key = key;this.uuid = UUID.randomUUID().toString();this.value = value;this.children = new LinkedList<TreeNode<K,V>>();List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();thisAsList.add(this);this.lookup = new HashMap<K, List<TreeNode<K,V>>>();this.lookup.put(key, thisAsList);}public TreeNode<K,V> addChild(K key, V value) {TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);childNode.level = level +1;childNode.parent = this;childNode.lookup = lookup;childNode.uuid = UUID.randomUUID().toString();this.children.add(childNode);List<TreeNode<K, V>> l = lookup.get(childNode.key);if(l==null){l = new ArrayList<TreeNode<K,V>>();lookup.put(childNode.key, l);}l.add(childNode);return childNode;}public boolean isLeaf() {return children.isEmpty() ;}public int getLevel() {return level;}public TreeNode<K,V> getParent() {return parent;}public V getValue() {return value;}public String getUuid() {return uuid;}public void setUuid(String uuid) {this.uuid = uuid;}public List<TreeNode<K,V>> getChildren() {return children;}public List<TreeNode<K, V>> getNodesByKey(K key) {return lookup.get(key);}public K getKey() {return key;}public List<TreeNode<K,V>> getLeafs() {List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();getLeafs(ret);return ret;}private void getLeafs(List<TreeNode<K, V>> ret) {if(children.isEmpty())ret.add(this);for (TreeNode<K, V> child : children) {child.getLeafs(ret);}}@Overridepublic String toString() {return toString(true);}public String toString(boolean withChildren) {if(key==null)return super.toString();StringBuffer ret = new StringBuffer();for (int i = 0; i < level; i++) {ret.append(" >");}ret.append(" " +key.toString());if(withChildren){for (TreeNode<K, V> child : children) {ret.append("\n").append(child.toString());}}return ret.toString();}public void setValue(V value) {this.value = value;}public void remove(List<TreeNode<K, V>> nodes) {for (TreeNode<K, V> n : nodes) {removeChildren(n);}for (TreeNode<K, V> n : nodes) {TreeNode<K, V> parent = n.getParent();if(parent==null)continue;parent.children.remove(n);}}private void removeChildren(TreeNode<K, V> node) {lookup.remove(node.getUuid());if(node.children.isEmpty())return;Iterator<TreeNode<K, V>> it = node.children.iterator();while (it.hasNext()) {TreeNode<K, V> child = (TreeNode<K, V>) it.next();removeChildren(child);it.remove();}}public void clear() {this.key = null;this.value = null;this.uuid = null;for (TreeNode<K, V> child : children) {child.clear();}this.children.clear();this.lookup.clear();}public TreeNode<K, V> getNodeById(K key, String uuid) {List<TreeNode<K, V>> nodes = getNodesByKey(key);for (TreeNode<K, V> treeNode : nodes) {if(uuid.equals(treeNode.getUuid()))return treeNode;}return null;}public HashMap<K, List<TreeNode<K, V>>> getLookup() {return lookup;}public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {this.lookup = lookup;}public void setLevel(int level) {this.level = level;}public void setKey(K key) {this.key = key;}public void setParent(TreeNode<K, V> parent) {this.parent = parent;}public void setChildren(List<TreeNode<K, V>> children) {this.children = children;}}On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <[hidden email]> wrote:Hi Flavio,which datatype are you using?On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <[hidden email]> wrote:Hi to all,during these days we've run a lot of Flink jobs and from time to time (apparently randomly) a different Exception arise during their executions...I hope one of them could help in finding the source of the problem..This time the exception is:
An error occurred while reading the next record.
at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.UTFDataFormatException: malformed input around byte 42
at org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130Could this error be cause by a missing implementation of hashCode() and equals()?Thanks in advance,Flavio
Free forum by Nabble | Edit this page |