| 
					
	
	 
		
Hi,
 
				a part of my bachelor thesis is an implementation of the Semi-Clustering algorithm [1].  
I’m using the Scatter-Gather-Iteration. Each vertex has to know its neighbors and the edge-value between of that. Because Gelly’s vertex doesn’t provide both information, I wrote an CustomVertexValue class.  An object contains a set of edges and
 a list of another custom class called SemiCluster, which contains a list of vertex and a double value.  
Now I’m able to create vertices like Vertex<Double, CustomVertexValue>. 
Unfortunately I get an exception if I run my Scatter-Gather-Iteration. 
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900) 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843) 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843) 
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null 
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) 
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094) 
at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:203) 
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) 
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) 
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) 
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null 
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) 
Caused by: java.lang.NullPointerException 
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230) 
at java.util.ArrayList$SubList.size(ArrayList.java:1040) 
at java.util.AbstractList.add(AbstractList.java:108) 
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) 
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) 
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246) 
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) 
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) 
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) 
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109) 
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) 
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) 
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) 
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973) 
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) 
Process finished with exit code 1 
——————————— 
Maybe Gelly isn’t able to handle my CustomVertexValue!? 
The ScatterFunctions works fine.  
Thanks a lot 
Best, 
Marc 
[1] http://www.dcs.bbk.ac.uk/~dell/teaching/cc/paper/sigmod10/p135-malewicz.pdf chapter 5.4 (page 141) 
	
	
	
	 | 
			
| 
					
	
	 Looks like java.util.ArrayList$SubList does not work out of the box with Kryo / Flink. 
	
	
	
	Try registering a custom serializer for it... On Wed, May 10, 2017 at 4:16 PM, Kaepke, Marc <[hidden email]> wrote: 
  | 
			
| 
					
	
	 Thanks for the hint. 
I focused on it and get a strange behavior. 
If I change from EdgeDirection.ALL (what I need) to EdgeDirection.OUT (or .IN), everything seems okey. The sublist operation was still active. 
Then I replaced the sublist with the entire list and there was no exception (EdgeDirection.All/IN/OUT worked). In case of .ALL the algorithm ran until „maximumNumberOfIterations“. How can I limit it and how can I set the termination conditions? 
Best, 
Marc 
  | 
			
| Free forum by Nabble | Edit this page | 
	
	
		