Weird Kryo exception (Unable to find class: java.ttil.HashSet)

classic Classic list List threaded Threaded
34 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Till Rohrmann
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till
Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
Do you have any suggestion about how to reproduce the error on a subset of the data?
I'm trying changing the following but I can't find a configuration causing the error :(

rivate static ExecutionEnvironment getLocalExecutionEnv() {
        org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration();
        c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
        c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
        c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f);
        c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
        c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
        c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
        c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
        env.setParallelism(16);
        env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class );
        return env;
    }

Best,
Flavio

On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till

Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
Still not able to reproduce the error locally but remotly :)
Any suggestions about how to try to reproduce it locally on a subset of the data?
This time I had:

com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

Best,
Flavio

On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Do you have any suggestion about how to reproduce the error on a subset of the data?
I'm trying changing the following but I can't find a configuration causing the error :(

rivate static ExecutionEnvironment getLocalExecutionEnv() {
        org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration();
        c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
        c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
        c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f);
        c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
        c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
        c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
        c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
        env.setParallelism(16);
        env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class );
        return env;
    }

Best,
Flavio


On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till


Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
Running the job with log level set to DEBUG made the job run successfully...Is this meaningful..? Maybe slowing down a little bit the threads could help serialization?

On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]> wrote:
Still not able to reproduce the error locally but remotly :)
Any suggestions about how to try to reproduce it locally on a subset of the data?
This time I had:

com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

Best,
Flavio


On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Do you have any suggestion about how to reproduce the error on a subset of the data?
I'm trying changing the following but I can't find a configuration causing the error :(

rivate static ExecutionEnvironment getLocalExecutionEnv() {
        org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration();
        c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
        c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
        c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f);
        c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
        c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
        c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
        c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
        env.setParallelism(16);
        env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class );
        return env;
    }

Best,
Flavio


On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till



Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Stephan Ewen
Hi!

That is a pretty thing indeed :-) Will try to look into this in a few days...

Stephan


On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Running the job with log level set to DEBUG made the job run successfully...Is this meaningful..? Maybe slowing down a little bit the threads could help serialization?


On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]> wrote:
Still not able to reproduce the error locally but remotly :)
Any suggestions about how to try to reproduce it locally on a subset of the data?
This time I had:

com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

Best,
Flavio


On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Do you have any suggestion about how to reproduce the error on a subset of the data?
I'm trying changing the following but I can't find a configuration causing the error :(

rivate static ExecutionEnvironment getLocalExecutionEnv() {
        org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration();
        c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
        c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
        c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f);
        c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
        c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
        c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
        c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
        env.setParallelism(16);
        env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class );
        return env;
    }

Best,
Flavio


On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till




Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
I tried to reproduce the error on a subset of the data and actually reducing the available memory and increasing a lot the gc (creating a lot of useless objects in one of the first UDFs) caused this error:

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: / by zero
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArithmeticException: / by zero
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

I hope this could help to restrict the debugging area :)

Best,
Flavio

On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

That is a pretty thing indeed :-) Will try to look into this in a few days...

Stephan


On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Running the job with log level set to DEBUG made the job run successfully...Is this meaningful..? Maybe slowing down a little bit the threads could help serialization?


On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]> wrote:
Still not able to reproduce the error locally but remotly :)
Any suggestions about how to try to reproduce it locally on a subset of the data?
This time I had:

com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

Best,
Flavio


On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Do you have any suggestion about how to reproduce the error on a subset of the data?
I'm trying changing the following but I can't find a configuration causing the error :(

rivate static ExecutionEnvironment getLocalExecutionEnv() {
        org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration();
        c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
        c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
        c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f);
        c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
        c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
        c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
        c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
        env.setParallelism(16);
        env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class );
        return env;
    }

Best,
Flavio


On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till





Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

rmetzger0
Hi Flavio,

can you privately share the source code of your Flink job with me?

I'm wondering whether the issue might be caused by a version mixup between different versions on the cluster (different JVM versions? or different files in the lib/ folder?), How are you deploying the Flink job?

Regards,
Robert


On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier <[hidden email]> wrote:
I tried to reproduce the error on a subset of the data and actually reducing the available memory and increasing a lot the gc (creating a lot of useless objects in one of the first UDFs) caused this error:

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: / by zero
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArithmeticException: / by zero
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

I hope this could help to restrict the debugging area :)

Best,
Flavio

On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

That is a pretty thing indeed :-) Will try to look into this in a few days...

Stephan


On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Running the job with log level set to DEBUG made the job run successfully...Is this meaningful..? Maybe slowing down a little bit the threads could help serialization?


On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]> wrote:
Still not able to reproduce the error locally but remotly :)
Any suggestions about how to try to reproduce it locally on a subset of the data?
This time I had:

com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

Best,
Flavio


On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Do you have any suggestion about how to reproduce the error on a subset of the data?
I'm trying changing the following but I can't find a configuration causing the error :(

rivate static ExecutionEnvironment getLocalExecutionEnv() {
        org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration();
        c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
        c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
        c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f);
        c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
        c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
        c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
        c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
        env.setParallelism(16);
        env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class );
        return env;
    }

Best,
Flavio


On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till






Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
The last week I've been able to run the job several times without any error. then I just recompiled it and the error reappered :(
This time I had:

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at main(DataInference.java:372)) -> Map (Map at writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
	at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
	... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
	at java.util.ArrayList.elementData(ArrayList.java:418)
	at java.util.ArrayList.get(ArrayList.java:431)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
	at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
	... 5 more

I can't really find a way to understand what is causing the error :(

On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

can you privately share the source code of your Flink job with me?

I'm wondering whether the issue might be caused by a version mixup between different versions on the cluster (different JVM versions? or different files in the lib/ folder?), How are you deploying the Flink job?

Regards,
Robert


On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier <[hidden email]> wrote:
I tried to reproduce the error on a subset of the data and actually reducing the available memory and increasing a lot the gc (creating a lot of useless objects in one of the first UDFs) caused this error:

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: / by zero
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArithmeticException: / by zero
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

I hope this could help to restrict the debugging area :)

Best,
Flavio

On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

That is a pretty thing indeed :-) Will try to look into this in a few days...

Stephan


On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Running the job with log level set to DEBUG made the job run successfully...Is this meaningful..? Maybe slowing down a little bit the threads could help serialization?


On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]> wrote:
Still not able to reproduce the error locally but remotly :)
Any suggestions about how to try to reproduce it locally on a subset of the data?
This time I had:

com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

Best,
Flavio


On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Do you have any suggestion about how to reproduce the error on a subset of the data?
I'm trying changing the following but I can't find a configuration causing the error :(

rivate static ExecutionEnvironment getLocalExecutionEnv() {
        org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration();
        c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
        c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
        c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f);
        c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
        c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
        c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
        c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
        env.setParallelism(16);
        env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class );
        return env;
    }

Best,
Flavio


On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till







Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
This time I had the following exception (obviously it.okkam.flinj.model.pojo.TipoSoggetto should be it.okkam.flink.model.pojo.TipoSoggetto).
java.lang.RuntimeException: Cannot instantiate class.
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: it.okkam.flinj.model.pojo.TipoSoggetto
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
	... 10 more


On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <[hidden email]> wrote:
The last week I've been able to run the job several times without any error. then I just recompiled it and the error reappered :(
This time I had:

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at main(DataInference.java:372)) -> Map (Map at writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
	at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
	... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
	at java.util.ArrayList.elementData(ArrayList.java:418)
	at java.util.ArrayList.get(ArrayList.java:431)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
	at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
	... 5 more

I can't really find a way to understand what is causing the error :(


On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

can you privately share the source code of your Flink job with me?

I'm wondering whether the issue might be caused by a version mixup between different versions on the cluster (different JVM versions? or different files in the lib/ folder?), How are you deploying the Flink job?

Regards,
Robert


On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier <[hidden email]> wrote:
I tried to reproduce the error on a subset of the data and actually reducing the available memory and increasing a lot the gc (creating a lot of useless objects in one of the first UDFs) caused this error:

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: / by zero
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArithmeticException: / by zero
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

I hope this could help to restrict the debugging area :)

Best,
Flavio

On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

That is a pretty thing indeed :-) Will try to look into this in a few days...

Stephan


On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Running the job with log level set to DEBUG made the job run successfully...Is this meaningful..? Maybe slowing down a little bit the threads could help serialization?


On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]> wrote:
Still not able to reproduce the error locally but remotly :)
Any suggestions about how to try to reproduce it locally on a subset of the data?
This time I had:

com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

Best,
Flavio


On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <[hidden email]> wrote:
Do you have any suggestion about how to reproduce the error on a subset of the data?
I'm trying changing the following but I can't find a configuration causing the error :(

rivate static ExecutionEnvironment getLocalExecutionEnv() {
        org.apache.flink.configuration.Configuration c = new org.apache.flink.configuration.Configuration();
        c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
        c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
        c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.9f);
        c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
        c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
        c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
        c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
        env.setParallelism(16);
        env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class );
        return env;
    }

Best,
Flavio


On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <[hidden email]> wrote:
The error look really strange. Flavio, could you compile a test program with example data and configuration to reproduce the problem. Given that, we could try to debug the problem.

Cheers,
Till








Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Ufuk Celebi
Unless someone really invests time into debugging this I fear that the
different misspellings are not really helpful, Flavio.

On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
<[hidden email]> wrote:

> This time I had the following exception (obviously
> it.okkam.flinj.model.pojo.TipoSoggetto should be
> it.okkam.flink.model.pojo.TipoSoggetto).
>
> java.lang.RuntimeException: Cannot instantiate class.
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> it.okkam.flinj.model.pojo.TipoSoggetto
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
> ... 10 more
>
>
>
> On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>>
>> The last week I've been able to run the job several times without any
>> error. then I just recompiled it and the error reappered :(
>> This time I had:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at main(DataInference.java:372)) -> Map (Map at
>> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated
>> due to an exception: Serializer consumed more bytes than the record had.
>> This indicates broken serialization. If you are using custom serialization
>> types (Value or Writable), check their serialization methods. If you are
>> using a Kryo-serialized type, check the corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>>
>> I can't really find a way to understand what is causing the error :(
>>
>>
>> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> Hi Flavio,
>>>
>>> can you privately share the source code of your Flink job with me?
>>>
>>> I'm wondering whether the issue might be caused by a version mixup
>>> between different versions on the cluster (different JVM versions? or
>>> different files in the lib/ folder?), How are you deploying the Flink job?
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier
>>> <[hidden email]> wrote:
>>>>
>>>> I tried to reproduce the error on a subset of the data and actually
>>>> reducing the available memory and increasing a lot the gc (creating a lot of
>>>> useless objects in one of the first UDFs) caused this error:
>>>>
>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> terminated due to an exception: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: java.lang.ArithmeticException: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>> I hope this could help to restrict the debugging area :)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>> Hi!
>>>>>
>>>>> That is a pretty thing indeed :-) Will try to look into this in a few
>>>>> days...
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier
>>>>> <[hidden email]> wrote:
>>>>>>
>>>>>> Running the job with log level set to DEBUG made the job run
>>>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the
>>>>>> threads could help serialization?
>>>>>>
>>>>>>
>>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Still not able to reproduce the error locally but remotly :)
>>>>>>> Any suggestions about how to try to reproduce it locally on a subset
>>>>>>> of the data?
>>>>>>> This time I had:
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Do you have any suggestion about how to reproduce the error on a
>>>>>>>> subset of the data?
>>>>>>>> I'm trying changing the following but I can't find a configuration
>>>>>>>> causing the error :(
>>>>>>>>
>>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>>>>>>>>         org.apache.flink.configuration.Configuration c = new
>>>>>>>> org.apache.flink.configuration.Configuration();
>>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
>>>>>>>> "/tmp");
>>>>>>>>
>>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>>>>>>>>         c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>>>>>> 0.9f);
>>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
>>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
>>>>>>>>
>>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
>>>>>>>>         ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>>>>>>>>         env.setParallelism(16);
>>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>>>>>>>> JodaDateTimeSerializer.class );
>>>>>>>>         return env;
>>>>>>>>     }
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann
>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> The error look really strange. Flavio, could you compile a test
>>>>>>>>> program with example data and configuration to reproduce the problem. Given
>>>>>>>>> that, we could try to debug the problem.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
After "some" digging into this problem I'm quite convinced that the problem is caused by a missing reset of the buffer during the Kryo deserialization,
likewise to what has been fixed by FLINK-2800 (https://github.com/apache/flink/pull/1308/files).
That fix added an output.clear() in theKryoException handling in KryoSerializer.serialize() but, for the deserialization part there's no such a call for the Input/NoFetchingInput object (there's a reset() method but I don't know whether it is the right one to call..).
Do you think that's reasonable?
Could someone help me in writing a test to see whether this situation is correctly handled by Flink?
I saw for example that in KryoGenericTypeSerializerTest there's a test to test the EOFException triggered by the deserialization but it doesn't test what happens making another call to the serializer after such Exception occurs (and thus check whether the buffers are correctly cleared or not).
I'll try to start my testing part from there for the moment if anybody has no objections..

Best,
Flavio

On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <[hidden email]> wrote:
Unless someone really invests time into debugging this I fear that the
different misspellings are not really helpful, Flavio.

On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
<[hidden email]> wrote:
> This time I had the following exception (obviously
> it.okkam.flinj.model.pojo.TipoSoggetto should be
> it.okkam.flink.model.pojo.TipoSoggetto).
>
> java.lang.RuntimeException: Cannot instantiate class.
>       at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>       at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>       at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> it.okkam.flinj.model.pojo.TipoSoggetto
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>       ... 10 more
>
>
>
> On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>>
>> The last week I've been able to run the job several times without any
>> error. then I just recompiled it and the error reappered :(
>> This time I had:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at main(DataInference.java:372)) -> Map (Map at
>> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated
>> due to an exception: Serializer consumed more bytes than the record had.
>> This indicates broken serialization. If you are using custom serialization
>> types (Value or Writable), check their serialization methods. If you are
>> using a Kryo-serialized type, check the corresponding Kryo serializer.
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>      at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>      at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>      at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>      at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>      ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>>      at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>      at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>      at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>      at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>      at java.util.ArrayList.elementData(ArrayList.java:418)
>>      at java.util.ArrayList.get(ArrayList.java:431)
>>      at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>      at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>      at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>      at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>      at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>      at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>      ... 5 more
>>
>>
>> I can't really find a way to understand what is causing the error :(
>>
>>
>> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> Hi Flavio,
>>>
>>> can you privately share the source code of your Flink job with me?
>>>
>>> I'm wondering whether the issue might be caused by a version mixup
>>> between different versions on the cluster (different JVM versions? or
>>> different files in the lib/ folder?), How are you deploying the Flink job?
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier
>>> <[hidden email]> wrote:
>>>>
>>>> I tried to reproduce the error on a subset of the data and actually
>>>> reducing the available memory and increasing a lot the gc (creating a lot of
>>>> useless objects in one of the first UDFs) caused this error:
>>>>
>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> terminated due to an exception: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: java.lang.ArithmeticException: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>> I hope this could help to restrict the debugging area :)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>> Hi!
>>>>>
>>>>> That is a pretty thing indeed :-) Will try to look into this in a few
>>>>> days...
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier
>>>>> <[hidden email]> wrote:
>>>>>>
>>>>>> Running the job with log level set to DEBUG made the job run
>>>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the
>>>>>> threads could help serialization?
>>>>>>
>>>>>>
>>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Still not able to reproduce the error locally but remotly :)
>>>>>>> Any suggestions about how to try to reproduce it locally on a subset
>>>>>>> of the data?
>>>>>>> This time I had:
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Do you have any suggestion about how to reproduce the error on a
>>>>>>>> subset of the data?
>>>>>>>> I'm trying changing the following but I can't find a configuration
>>>>>>>> causing the error :(
>>>>>>>>
>>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>>>>>>>>         org.apache.flink.configuration.Configuration c = new
>>>>>>>> org.apache.flink.configuration.Configuration();
>>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
>>>>>>>> "/tmp");
>>>>>>>>
>>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>>>>>>>>         c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>>>>>> 0.9f);
>>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
>>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
>>>>>>>>
>>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
>>>>>>>>         ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>>>>>>>>         env.setParallelism(16);
>>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>>>>>>>> JodaDateTimeSerializer.class );
>>>>>>>>         return env;
>>>>>>>>     }
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann
>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> The error look really strange. Flavio, could you compile a test
>>>>>>>>> program with example data and configuration to reproduce the problem. Given
>>>>>>>>> that, we could try to debug the problem.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Aljoscha Krettek
That's nice. Can you try it on your cluster with an added "reset" call on the buffer?

On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier <[hidden email]> wrote:
After "some" digging into this problem I'm quite convinced that the problem is caused by a missing reset of the buffer during the Kryo deserialization,
likewise to what has been fixed by FLINK-2800 (https://github.com/apache/flink/pull/1308/files).
That fix added an output.clear() in theKryoException handling in KryoSerializer.serialize() but, for the deserialization part there's no such a call for the Input/NoFetchingInput object (there's a reset() method but I don't know whether it is the right one to call..).
Do you think that's reasonable?
Could someone help me in writing a test to see whether this situation is correctly handled by Flink?
I saw for example that in KryoGenericTypeSerializerTest there's a test to test the EOFException triggered by the deserialization but it doesn't test what happens making another call to the serializer after such Exception occurs (and thus check whether the buffers are correctly cleared or not).
I'll try to start my testing part from there for the moment if anybody has no objections..

Best,
Flavio


On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <[hidden email]> wrote:
Unless someone really invests time into debugging this I fear that the
different misspellings are not really helpful, Flavio.

On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
<[hidden email]> wrote:
> This time I had the following exception (obviously
> it.okkam.flinj.model.pojo.TipoSoggetto should be
> it.okkam.flink.model.pojo.TipoSoggetto).
>
> java.lang.RuntimeException: Cannot instantiate class.
>       at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>       at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>       at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> it.okkam.flinj.model.pojo.TipoSoggetto
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>       ... 10 more
>
>
>
> On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>>
>> The last week I've been able to run the job several times without any
>> error. then I just recompiled it and the error reappered :(
>> This time I had:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at main(DataInference.java:372)) -> Map (Map at
>> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated
>> due to an exception: Serializer consumed more bytes than the record had.
>> This indicates broken serialization. If you are using custom serialization
>> types (Value or Writable), check their serialization methods. If you are
>> using a Kryo-serialized type, check the corresponding Kryo serializer.
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>      at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>      at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>      at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>      at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>      ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>>      at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>      at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>      at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>      at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>      at java.util.ArrayList.elementData(ArrayList.java:418)
>>      at java.util.ArrayList.get(ArrayList.java:431)
>>      at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>      at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>      at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>      at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>      at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>      at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>      ... 5 more
>>
>>
>> I can't really find a way to understand what is causing the error :(
>>
>>
>> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> Hi Flavio,
>>>
>>> can you privately share the source code of your Flink job with me?
>>>
>>> I'm wondering whether the issue might be caused by a version mixup
>>> between different versions on the cluster (different JVM versions? or
>>> different files in the lib/ folder?), How are you deploying the Flink job?
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier
>>> <[hidden email]> wrote:
>>>>
>>>> I tried to reproduce the error on a subset of the data and actually
>>>> reducing the available memory and increasing a lot the gc (creating a lot of
>>>> useless objects in one of the first UDFs) caused this error:
>>>>
>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> terminated due to an exception: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: java.lang.ArithmeticException: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>> I hope this could help to restrict the debugging area :)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>> Hi!
>>>>>
>>>>> That is a pretty thing indeed :-) Will try to look into this in a few
>>>>> days...
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier
>>>>> <[hidden email]> wrote:
>>>>>>
>>>>>> Running the job with log level set to DEBUG made the job run
>>>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the
>>>>>> threads could help serialization?
>>>>>>
>>>>>>
>>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Still not able to reproduce the error locally but remotly :)
>>>>>>> Any suggestions about how to try to reproduce it locally on a subset
>>>>>>> of the data?
>>>>>>> This time I had:
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Do you have any suggestion about how to reproduce the error on a
>>>>>>>> subset of the data?
>>>>>>>> I'm trying changing the following but I can't find a configuration
>>>>>>>> causing the error :(
>>>>>>>>
>>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>>>>>>>>         org.apache.flink.configuration.Configuration c = new
>>>>>>>> org.apache.flink.configuration.Configuration();
>>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
>>>>>>>> "/tmp");
>>>>>>>>
>>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>>>>>>>>         c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>>>>>> 0.9f);
>>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
>>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
>>>>>>>>
>>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
>>>>>>>>         ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>>>>>>>>         env.setParallelism(16);
>>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>>>>>>>> JodaDateTimeSerializer.class );
>>>>>>>>         return env;
>>>>>>>>     }
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann
>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> The error look really strange. Flavio, could you compile a test
>>>>>>>>> program with example data and configuration to reproduce the problem. Given
>>>>>>>>> that, we could try to debug the problem.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
Hi Aljoscha,
of course I can :)
Thanks for helping me..do you think it is the right thing to do calling reset()?
Actually, I don't know whether this is meaningful or not, but I already ran the job successfully once on the cluster (a second attempt is curerntly running) after my accidental modification to the KryoException handling in the KryoSerializer.deserialize()...
My intention was to reset the input buffer calling the clear() method on it so I copied the line from above but I forgot to change the variable so I called output.clear() instead of input.reset()...
For this reason I say that I don't know if this is meaningful or not...

On Tue, Jun 7, 2016 at 2:50 PM, Aljoscha Krettek <[hidden email]> wrote:
That's nice. Can you try it on your cluster with an added "reset" call on the buffer?

On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier <[hidden email]> wrote:
After "some" digging into this problem I'm quite convinced that the problem is caused by a missing reset of the buffer during the Kryo deserialization,
likewise to what has been fixed by FLINK-2800 (https://github.com/apache/flink/pull/1308/files).
That fix added an output.clear() in theKryoException handling in KryoSerializer.serialize() but, for the deserialization part there's no such a call for the Input/NoFetchingInput object (there's a reset() method but I don't know whether it is the right one to call..).
Do you think that's reasonable?
Could someone help me in writing a test to see whether this situation is correctly handled by Flink?
I saw for example that in KryoGenericTypeSerializerTest there's a test to test the EOFException triggered by the deserialization but it doesn't test what happens making another call to the serializer after such Exception occurs (and thus check whether the buffers are correctly cleared or not).
I'll try to start my testing part from there for the moment if anybody has no objections..

Best,
Flavio


On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <[hidden email]> wrote:
Unless someone really invests time into debugging this I fear that the
different misspellings are not really helpful, Flavio.

On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
<[hidden email]> wrote:
> This time I had the following exception (obviously
> it.okkam.flinj.model.pojo.TipoSoggetto should be
> it.okkam.flink.model.pojo.TipoSoggetto).
>
> java.lang.RuntimeException: Cannot instantiate class.
>       at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>       at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>       at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> it.okkam.flinj.model.pojo.TipoSoggetto
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>       ... 10 more
>
>
>
> On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>>
>> The last week I've been able to run the job several times without any
>> error. then I just recompiled it and the error reappered :(
>> This time I had:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at main(DataInference.java:372)) -> Map (Map at
>> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated
>> due to an exception: Serializer consumed more bytes than the record had.
>> This indicates broken serialization. If you are using custom serialization
>> types (Value or Writable), check their serialization methods. If you are
>> using a Kryo-serialized type, check the corresponding Kryo serializer.
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>      at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>      at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>      at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>      at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>      ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>>      at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>      at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>      at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>      at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>      at java.util.ArrayList.elementData(ArrayList.java:418)
>>      at java.util.ArrayList.get(ArrayList.java:431)
>>      at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>      at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>      at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>      at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>      at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>      at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>      ... 5 more
>>
>>
>> I can't really find a way to understand what is causing the error :(
>>
>>
>> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> Hi Flavio,
>>>
>>> can you privately share the source code of your Flink job with me?
>>>
>>> I'm wondering whether the issue might be caused by a version mixup
>>> between different versions on the cluster (different JVM versions? or
>>> different files in the lib/ folder?), How are you deploying the Flink job?
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier
>>> <[hidden email]> wrote:
>>>>
>>>> I tried to reproduce the error on a subset of the data and actually
>>>> reducing the available memory and increasing a lot the gc (creating a lot of
>>>> useless objects in one of the first UDFs) caused this error:
>>>>
>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> terminated due to an exception: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: java.lang.ArithmeticException: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>> I hope this could help to restrict the debugging area :)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>> Hi!
>>>>>
>>>>> That is a pretty thing indeed :-) Will try to look into this in a few
>>>>> days...
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier
>>>>> <[hidden email]> wrote:
>>>>>>
>>>>>> Running the job with log level set to DEBUG made the job run
>>>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the
>>>>>> threads could help serialization?
>>>>>>
>>>>>>
>>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Still not able to reproduce the error locally but remotly :)
>>>>>>> Any suggestions about how to try to reproduce it locally on a subset
>>>>>>> of the data?
>>>>>>> This time I had:
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Do you have any suggestion about how to reproduce the error on a
>>>>>>>> subset of the data?
>>>>>>>> I'm trying changing the following but I can't find a configuration
>>>>>>>> causing the error :(
>>>>>>>>
>>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>>>>>>>>         org.apache.flink.configuration.Configuration c = new
>>>>>>>> org.apache.flink.configuration.Configuration();
>>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
>>>>>>>> "/tmp");
>>>>>>>>
>>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>>>>>>>>         c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>>>>>> 0.9f);
>>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
>>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
>>>>>>>>
>>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
>>>>>>>>         ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>>>>>>>>         env.setParallelism(16);
>>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>>>>>>>> JodaDateTimeSerializer.class );
>>>>>>>>         return env;
>>>>>>>>     }
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann
>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> The error look really strange. Flavio, could you compile a test
>>>>>>>>> program with example data and configuration to reproduce the problem. Given
>>>>>>>>> that, we could try to debug the problem.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

Flavio Pompermaier
After a second look to KryoSerializer I fear that Input and Output are never closed..am I right?

On Tue, Jun 7, 2016 at 3:06 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Aljoscha,
of course I can :)
Thanks for helping me..do you think it is the right thing to do calling reset()?
Actually, I don't know whether this is meaningful or not, but I already ran the job successfully once on the cluster (a second attempt is curerntly running) after my accidental modification to the KryoException handling in the KryoSerializer.deserialize()...
My intention was to reset the input buffer calling the clear() method on it so I copied the line from above but I forgot to change the variable so I called output.clear() instead of input.reset()...
For this reason I say that I don't know if this is meaningful or not...


On Tue, Jun 7, 2016 at 2:50 PM, Aljoscha Krettek <[hidden email]> wrote:
That's nice. Can you try it on your cluster with an added "reset" call on the buffer?

On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier <[hidden email]> wrote:
After "some" digging into this problem I'm quite convinced that the problem is caused by a missing reset of the buffer during the Kryo deserialization,
likewise to what has been fixed by FLINK-2800 (https://github.com/apache/flink/pull/1308/files).
That fix added an output.clear() in theKryoException handling in KryoSerializer.serialize() but, for the deserialization part there's no such a call for the Input/NoFetchingInput object (there's a reset() method but I don't know whether it is the right one to call..).
Do you think that's reasonable?
Could someone help me in writing a test to see whether this situation is correctly handled by Flink?
I saw for example that in KryoGenericTypeSerializerTest there's a test to test the EOFException triggered by the deserialization but it doesn't test what happens making another call to the serializer after such Exception occurs (and thus check whether the buffers are correctly cleared or not).
I'll try to start my testing part from there for the moment if anybody has no objections..

Best,
Flavio


On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <[hidden email]> wrote:
Unless someone really invests time into debugging this I fear that the
different misspellings are not really helpful, Flavio.

On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
<[hidden email]> wrote:
> This time I had the following exception (obviously
> it.okkam.flinj.model.pojo.TipoSoggetto should be
> it.okkam.flink.model.pojo.TipoSoggetto).
>
> java.lang.RuntimeException: Cannot instantiate class.
>       at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>       at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>       at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> it.okkam.flinj.model.pojo.TipoSoggetto
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>       ... 10 more
>
>
>
> On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>>
>> The last week I've been able to run the job several times without any
>> error. then I just recompiled it and the error reappered :(
>> This time I had:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at main(DataInference.java:372)) -> Map (Map at
>> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated
>> due to an exception: Serializer consumed more bytes than the record had.
>> This indicates broken serialization. If you are using custom serialization
>> types (Value or Writable), check their serialization methods. If you are
>> using a Kryo-serialized type, check the corresponding Kryo serializer.
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>      at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>      at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>      at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>      at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>      ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>>      at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>      at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>      at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>      at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>      at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>      at java.util.ArrayList.elementData(ArrayList.java:418)
>>      at java.util.ArrayList.get(ArrayList.java:431)
>>      at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>      at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>      at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>      at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>      at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>      at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>      at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>      ... 5 more
>>
>>
>> I can't really find a way to understand what is causing the error :(
>>
>>
>> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> Hi Flavio,
>>>
>>> can you privately share the source code of your Flink job with me?
>>>
>>> I'm wondering whether the issue might be caused by a version mixup
>>> between different versions on the cluster (different JVM versions? or
>>> different files in the lib/ folder?), How are you deploying the Flink job?
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier
>>> <[hidden email]> wrote:
>>>>
>>>> I tried to reproduce the error on a subset of the data and actually
>>>> reducing the available memory and increasing a lot the gc (creating a lot of
>>>> useless objects in one of the first UDFs) caused this error:
>>>>
>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> terminated due to an exception: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: java.lang.ArithmeticException: / by zero
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>> I hope this could help to restrict the debugging area :)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>> Hi!
>>>>>
>>>>> That is a pretty thing indeed :-) Will try to look into this in a few
>>>>> days...
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier
>>>>> <[hidden email]> wrote:
>>>>>>
>>>>>> Running the job with log level set to DEBUG made the job run
>>>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the
>>>>>> threads could help serialization?
>>>>>>
>>>>>>
>>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Still not able to reproduce the error locally but remotly :)
>>>>>>> Any suggestions about how to try to reproduce it locally on a subset
>>>>>>> of the data?
>>>>>>> This time I had:
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Do you have any suggestion about how to reproduce the error on a
>>>>>>>> subset of the data?
>>>>>>>> I'm trying changing the following but I can't find a configuration
>>>>>>>> causing the error :(
>>>>>>>>
>>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>>>>>>>>         org.apache.flink.configuration.Configuration c = new
>>>>>>>> org.apache.flink.configuration.Configuration();
>>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
>>>>>>>> "/tmp");
>>>>>>>>
>>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>>>>>>>>         c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>>>>>> 0.9f);
>>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
>>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
>>>>>>>>
>>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
>>>>>>>>         ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>>>>>>>>         env.setParallelism(16);
>>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>>>>>>>> JodaDateTimeSerializer.class );
>>>>>>>>         return env;
>>>>>>>>     }
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann
>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> The error look really strange. Flavio, could you compile a test
>>>>>>>>> program with example data and configuration to reproduce the problem. Given
>>>>>>>>> that, we could try to debug the problem.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>



12