The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.
Cheers, Till |
Do you have any suggestion about how to reproduce the error on a subset of the data? I'm trying changing the following but I can't find a configuration causing the error :(rivate static ExecutionEnvironment getLocalExecutionEnv() { org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration(); c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp"); c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp"); c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f); c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s"); c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12); ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c); env.setParallelism(16); env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class ); return env; } On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
|
Still not able to reproduce the error locally but remotly :)
Any suggestions about how to try to reproduce it locally on a subset of the data? This time I had: com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Best, Flavio On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Running the job with log level set to DEBUG made the job run successfully...Is this meaningful..? Maybe slowing down a little bit the threads could help serialization?
On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Hi! That is a pretty thing indeed :-) Will try to look into this in a few days... Stephan On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier <[hidden email]> wrote:
|
I tried to reproduce the error on a subset of the data and actually reducing the available memory and increasing a lot the gc (creating a lot of useless objects in one of the first UDFs) caused this error:
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: / by zero at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: java.lang.ArithmeticException: / by zero at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) I hope this could help to restrict the debugging area :) Best, Flavio On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi Flavio, can you privately share the source code of your Flink job with me? I'm wondering whether the issue might be caused by a version mixup between different versions on the cluster (different JVM versions? or different files in the lib/ folder?), How are you deploying the Flink job? Regards, Robert On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier <[hidden email]> wrote:
|
The last week I've been able to run the job several times without any error. then I just recompiled it and the error reappered :( This time I had:java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at main(DataInference.java:372)) -> Map (Map at writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) ... 5 more On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]> wrote:
|
This time I had the following exception (obviously it.okkam.flinj.model.pojo.TipoSoggetto should be it.okkam.flink.model.pojo.TipoSoggetto).
java.lang.RuntimeException: Cannot instantiate class. at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: it.okkam.flinj.model.pojo.TipoSoggetto at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) ... 10 more On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Unless someone really invests time into debugging this I fear that the
different misspellings are not really helpful, Flavio. On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier <[hidden email]> wrote: > This time I had the following exception (obviously > it.okkam.flinj.model.pojo.TipoSoggetto should be > it.okkam.flink.model.pojo.TipoSoggetto). > > java.lang.RuntimeException: Cannot instantiate class. > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassNotFoundException: > it.okkam.flinj.model.pojo.TipoSoggetto > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) > ... 10 more > > > > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <[hidden email]> > wrote: >> >> The last week I've been able to run the job several times without any >> error. then I just recompiled it and the error reappered :( >> This time I had: >> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup >> at main(DataInference.java:372)) -> Map (Map at >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error >> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated >> due to an exception: Serializer consumed more bytes than the record had. >> This indicates broken serialization. If you are using custom serialization >> types (Value or Writable), check their serialization methods. If you are >> using a Kryo-serialized type, check the corresponding Kryo serializer. >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger Reading Thread' terminated due to an exception: >> Serializer consumed more bytes than the record had. This indicates broken >> serialization. If you are using custom serialization types (Value or >> Writable), check their serialization methods. If you are using a >> Kryo-serialized type, check the corresponding Kryo serializer. >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> at >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >> at >> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >> terminated due to an exception: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >> Caused by: java.io.IOException: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >> at >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> at >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >> at java.util.ArrayList.elementData(ArrayList.java:418) >> at java.util.ArrayList.get(ArrayList.java:431) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> ... 5 more >> >> >> I can't really find a way to understand what is causing the error :( >> >> >> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]> >> wrote: >>> >>> Hi Flavio, >>> >>> can you privately share the source code of your Flink job with me? >>> >>> I'm wondering whether the issue might be caused by a version mixup >>> between different versions on the cluster (different JVM versions? or >>> different files in the lib/ folder?), How are you deploying the Flink job? >>> >>> Regards, >>> Robert >>> >>> >>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier >>> <[hidden email]> wrote: >>>> >>>> I tried to reproduce the error on a subset of the data and actually >>>> reducing the available memory and increasing a lot the gc (creating a lot of >>>> useless objects in one of the first UDFs) caused this error: >>>> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>> terminated due to an exception: / by zero >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>> Caused by: java.lang.ArithmeticException: / by zero >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651) >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565) >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417) >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>> >>>> I hope this could help to restrict the debugging area :) >>>> >>>> Best, >>>> Flavio >>>> >>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote: >>>>> >>>>> Hi! >>>>> >>>>> That is a pretty thing indeed :-) Will try to look into this in a few >>>>> days... >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier >>>>> <[hidden email]> wrote: >>>>>> >>>>>> Running the job with log level set to DEBUG made the job run >>>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the >>>>>> threads could help serialization? >>>>>> >>>>>> >>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier >>>>>> <[hidden email]> wrote: >>>>>>> >>>>>>> Still not able to reproduce the error locally but remotly :) >>>>>>> Any suggestions about how to try to reproduce it locally on a subset >>>>>>> of the data? >>>>>>> This time I had: >>>>>>> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>>>>> at >>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>> >>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier >>>>>>> <[hidden email]> wrote: >>>>>>>> >>>>>>>> Do you have any suggestion about how to reproduce the error on a >>>>>>>> subset of the data? >>>>>>>> I'm trying changing the following but I can't find a configuration >>>>>>>> causing the error :( >>>>>>>> >>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() { >>>>>>>> org.apache.flink.configuration.Configuration c = new >>>>>>>> org.apache.flink.configuration.Configuration(); >>>>>>>> c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, >>>>>>>> "/tmp"); >>>>>>>> >>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp"); >>>>>>>> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >>>>>>>> 0.9f); >>>>>>>> c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); >>>>>>>> c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s"); >>>>>>>> >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12); >>>>>>>> ExecutionEnvironment env = >>>>>>>> ExecutionEnvironment.createLocalEnvironment(c); >>>>>>>> env.setParallelism(16); >>>>>>>> env.registerTypeWithKryoSerializer(DateTime.class, >>>>>>>> JodaDateTimeSerializer.class ); >>>>>>>> return env; >>>>>>>> } >>>>>>>> >>>>>>>> Best, >>>>>>>> Flavio >>>>>>>> >>>>>>>> >>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann >>>>>>>> <[hidden email]> wrote: >>>>>>>>> >>>>>>>>> The error look really strange. Flavio, could you compile a test >>>>>>>>> program with example data and configuration to reproduce the problem. Given >>>>>>>>> that, we could try to debug the problem. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > |
After "some" digging into this problem I'm quite convinced that the problem is caused by a missing reset of the buffer during the Kryo deserialization, likewise to what has been fixed by FLINK-2800 (https://github.com/apache/flink/pull/1308/files).Could someone help me in writing a test to see whether this situation is correctly handled by Flink? I saw for example that in KryoGenericTypeSerializerTest there's a test to test the EOFException triggered by the deserialization but it doesn't test what happens making another call to the serializer after such Exception occurs (and thus check whether the buffers are correctly cleared or not). I'll try to start my testing part from there for the moment if anybody has no objections.. On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <[hidden email]> wrote: Unless someone really invests time into debugging this I fear that the |
That's nice. Can you try it on your cluster with an added "reset" call on the buffer? On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier <[hidden email]> wrote:
|
Hi Aljoscha, For this reason I say that I don't know if this is meaningful or not...of course I can :) Thanks for helping me..do you think it is the right thing to do calling reset()? Actually, I don't know whether this is meaningful or not, but I already ran the job successfully once on the cluster (a second attempt is curerntly running) after my accidental modification to the KryoException handling in the KryoSerializer.deserialize()... My intention was to reset the input buffer calling the clear() method on it so I copied the line from above but I forgot to change the variable so I called output.clear() instead of input.reset()... On Tue, Jun 7, 2016 at 2:50 PM, Aljoscha Krettek <[hidden email]> wrote:
|
After a second look to KryoSerializer I fear that Input and Output are never closed..am I right?
On Tue, Jun 7, 2016 at 3:06 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |