Unexpected out of bounds error in UnilateralSortMerger

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Unexpected out of bounds error in UnilateralSortMerger

Theodore Vasiloudis
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.
Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Stephan Ewen
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.

Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Theodore Vasiloudis
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.


Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Theodore Vasiloudis
I haven't been able to reproduce this with other datasets. Taking a smaller sample from the large dataset I'm using (link to data) causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here. I've tried the new version commited recently by Chiwan, but I still get the same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <[hidden email]> wrote:
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.



Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Stephan Ewen
The bug looks to be in the serialization via Kryo while spilling windows. Note that Kryo is here used as a fallback serializer, since the SparseVector is not transparent type to Flink.

I think there are two possible reasons:
  1) Kryo, or our Kryo setup has an issue here
  2) Kryo is inconsistently configured. There are multiple Kryo instances used across the serializers in the sorter. There may be a bug that they are not initialized in sync.


To check this, can you build Flink with this pull request (https://github.com/apache/flink/pull/1528) or from this branch (https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes it?


Thanks,
Stephan





On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <[hidden email]> wrote:
I haven't been able to reproduce this with other datasets. Taking a smaller sample from the large dataset I'm using (link to data) causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here. I've tried the new version commited recently by Chiwan, but I still get the same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <[hidden email]> wrote:
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.




Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Theodore Vasiloudis
OK here's what I tried:

* Build Flink (mvn clean install) from the branch you linked (kryo)
* Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version, added local maven repo to resolvers so that it picks up the previously installed version (I hope)
* Launch local cluster from newly built Flink, try to run job

Still getting the same error.

Is there a way to ensure that SBT is picking up the local version of Flink to build the uber-jar?
Does it matter in this case, or is it enough that I'm sure the launched Flink instance comes from the branch you linked?

On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <[hidden email]> wrote:
The bug looks to be in the serialization via Kryo while spilling windows. Note that Kryo is here used as a fallback serializer, since the SparseVector is not transparent type to Flink.

I think there are two possible reasons:
  1) Kryo, or our Kryo setup has an issue here
  2) Kryo is inconsistently configured. There are multiple Kryo instances used across the serializers in the sorter. There may be a bug that they are not initialized in sync.


To check this, can you build Flink with this pull request (https://github.com/apache/flink/pull/1528) or from this branch (https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes it?


Thanks,
Stephan





On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <[hidden email]> wrote:
I haven't been able to reproduce this with other datasets. Taking a smaller sample from the large dataset I'm using (link to data) causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here. I've tried the new version commited recently by Chiwan, but I still get the same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <[hidden email]> wrote:
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.





Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Till Rohrmann

You could change the version of Stephan’s branch via mvn versions:set -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after you install the Flink binaries you can reference them in your project by setting the version of your Flink dependencies to MyCustomBuildVersion. That way, you are sure that the right dependencies are used.

Alternatively, you could compile an example program with example input data which can reproduce the problem. Then I could also take a look at it.

Cheers,
Till


On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <[hidden email]> wrote:
OK here's what I tried:

* Build Flink (mvn clean install) from the branch you linked (kryo)
* Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version, added local maven repo to resolvers so that it picks up the previously installed version (I hope)
* Launch local cluster from newly built Flink, try to run job

Still getting the same error.

Is there a way to ensure that SBT is picking up the local version of Flink to build the uber-jar?
Does it matter in this case, or is it enough that I'm sure the launched Flink instance comes from the branch you linked?


On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <[hidden email]> wrote:
The bug looks to be in the serialization via Kryo while spilling windows. Note that Kryo is here used as a fallback serializer, since the SparseVector is not transparent type to Flink.

I think there are two possible reasons:
  1) Kryo, or our Kryo setup has an issue here
  2) Kryo is inconsistently configured. There are multiple Kryo instances used across the serializers in the sorter. There may be a bug that they are not initialized in sync.


To check this, can you build Flink with this pull request (https://github.com/apache/flink/pull/1528) or from this branch (https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes it?


Thanks,
Stephan





On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <[hidden email]> wrote:
I haven't been able to reproduce this with other datasets. Taking a smaller sample from the large dataset I'm using (link to data) causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here. I've tried the new version commited recently by Chiwan, but I still get the same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <[hidden email]> wrote:
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.






Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Theodore Vasiloudis
Alright I will try to do that.

I've tried running the job with a CSV file as input, and using DenseVectors to represent the features, still the same IndexOutOfBounds error.

On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <[hidden email]> wrote:

You could change the version of Stephan’s branch via mvn versions:set -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after you install the Flink binaries you can reference them in your project by setting the version of your Flink dependencies to MyCustomBuildVersion. That way, you are sure that the right dependencies are used.

Alternatively, you could compile an example program with example input data which can reproduce the problem. Then I could also take a look at it.

Cheers,
Till


On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <[hidden email]> wrote:
OK here's what I tried:

* Build Flink (mvn clean install) from the branch you linked (kryo)
* Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version, added local maven repo to resolvers so that it picks up the previously installed version (I hope)
* Launch local cluster from newly built Flink, try to run job

Still getting the same error.

Is there a way to ensure that SBT is picking up the local version of Flink to build the uber-jar?
Does it matter in this case, or is it enough that I'm sure the launched Flink instance comes from the branch you linked?


On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <[hidden email]> wrote:
The bug looks to be in the serialization via Kryo while spilling windows. Note that Kryo is here used as a fallback serializer, since the SparseVector is not transparent type to Flink.

I think there are two possible reasons:
  1) Kryo, or our Kryo setup has an issue here
  2) Kryo is inconsistently configured. There are multiple Kryo instances used across the serializers in the sorter. There may be a bug that they are not initialized in sync.


To check this, can you build Flink with this pull request (https://github.com/apache/flink/pull/1528) or from this branch (https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes it?


Thanks,
Stephan





On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <[hidden email]> wrote:
I haven't been able to reproduce this with other datasets. Taking a smaller sample from the large dataset I'm using (link to data) causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here. I've tried the new version commited recently by Chiwan, but I still get the same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <[hidden email]> wrote:
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.







Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Stephan Ewen
Can you again post the stack trace? With the patched branch, the reference mapper should not be used any more (which is where the original exception occurred).

On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis <[hidden email]> wrote:
Alright I will try to do that.

I've tried running the job with a CSV file as input, and using DenseVectors to represent the features, still the same IndexOutOfBounds error.

On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <[hidden email]> wrote:

You could change the version of Stephan’s branch via mvn versions:set -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after you install the Flink binaries you can reference them in your project by setting the version of your Flink dependencies to MyCustomBuildVersion. That way, you are sure that the right dependencies are used.

Alternatively, you could compile an example program with example input data which can reproduce the problem. Then I could also take a look at it.

Cheers,
Till


On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <[hidden email]> wrote:
OK here's what I tried:

* Build Flink (mvn clean install) from the branch you linked (kryo)
* Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version, added local maven repo to resolvers so that it picks up the previously installed version (I hope)
* Launch local cluster from newly built Flink, try to run job

Still getting the same error.

Is there a way to ensure that SBT is picking up the local version of Flink to build the uber-jar?
Does it matter in this case, or is it enough that I'm sure the launched Flink instance comes from the branch you linked?


On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <[hidden email]> wrote:
The bug looks to be in the serialization via Kryo while spilling windows. Note that Kryo is here used as a fallback serializer, since the SparseVector is not transparent type to Flink.

I think there are two possible reasons:
  1) Kryo, or our Kryo setup has an issue here
  2) Kryo is inconsistently configured. There are multiple Kryo instances used across the serializers in the sorter. There may be a bug that they are not initialized in sync.


To check this, can you build Flink with this pull request (https://github.com/apache/flink/pull/1528) or from this branch (https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes it?


Thanks,
Stephan





On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <[hidden email]> wrote:
I haven't been able to reproduce this with other datasets. Taking a smaller sample from the large dataset I'm using (link to data) causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here. I've tried the new version commited recently by Chiwan, but I still get the same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <[hidden email]> wrote:
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.








Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Theodore Vasiloudis
This is the stack trace from running with the patched branch:

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
        at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
        at fosdem.SVMClassification$.main(SVMClassification.scala:114)
        at fosdem.SVMClassification.main(SVMClassification.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException
        at org.apache.flink.core.memory.HeapMemorySegment.put(HeapMemorySegment.java:128)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:195)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
        at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:266)
        at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
        at com.esotericsoftware.kryo.io.Output.writeInts(Output.java:669)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:63)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:52)
        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
        ... 9 more

On Wed, Jan 20, 2016 at 9:45 PM, Stephan Ewen <[hidden email]> wrote:
Can you again post the stack trace? With the patched branch, the reference mapper should not be used any more (which is where the original exception occurred).

On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis <[hidden email]> wrote:
Alright I will try to do that.

I've tried running the job with a CSV file as input, and using DenseVectors to represent the features, still the same IndexOutOfBounds error.

On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <[hidden email]> wrote:

You could change the version of Stephan’s branch via mvn versions:set -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after you install the Flink binaries you can reference them in your project by setting the version of your Flink dependencies to MyCustomBuildVersion. That way, you are sure that the right dependencies are used.

Alternatively, you could compile an example program with example input data which can reproduce the problem. Then I could also take a look at it.

Cheers,
Till


On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <[hidden email]> wrote:
OK here's what I tried:

* Build Flink (mvn clean install) from the branch you linked (kryo)
* Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version, added local maven repo to resolvers so that it picks up the previously installed version (I hope)
* Launch local cluster from newly built Flink, try to run job

Still getting the same error.

Is there a way to ensure that SBT is picking up the local version of Flink to build the uber-jar?
Does it matter in this case, or is it enough that I'm sure the launched Flink instance comes from the branch you linked?


On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <[hidden email]> wrote:
The bug looks to be in the serialization via Kryo while spilling windows. Note that Kryo is here used as a fallback serializer, since the SparseVector is not transparent type to Flink.

I think there are two possible reasons:
  1) Kryo, or our Kryo setup has an issue here
  2) Kryo is inconsistently configured. There are multiple Kryo instances used across the serializers in the sorter. There may be a bug that they are not initialized in sync.


To check this, can you build Flink with this pull request (https://github.com/apache/flink/pull/1528) or from this branch (https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes it?


Thanks,
Stephan





On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <[hidden email]> wrote:
I haven't been able to reproduce this with other datasets. Taking a smaller sample from the large dataset I'm using (link to data) causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here. I've tried the new version commited recently by Chiwan, but I still get the same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <[hidden email]> wrote:
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.









Reply | Threaded
Open this post in threaded view
|

Re: Unexpected out of bounds error in UnilateralSortMerger

Theodore Vasiloudis
And this is the one from running with a CSV input, this time I've verified that I'm using the correct version of Flink, according to Till's instructions:

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
        at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
        at fosdem.SVMClassification$.main(SVMClassification.scala:128)
        at fosdem.SVMClassification.main(SVMClassification.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: scala.collection.immutable.Map$EmptyMap$ cannot be cast to org.apache.flink.ml.math.Vector
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: scala.collection.immutable.Map$EmptyMap$ cannot be cast to org.apache.flink.ml.math.Vector
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to org.apache.flink.ml.math.Vector
        at org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353)
        at org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:114)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:111)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:104)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
        at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

On Thu, Jan 21, 2016 at 10:51 AM, Theodore Vasiloudis <[hidden email]> wrote:
This is the stack trace from running with the patched branch:

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
        at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
        at fosdem.SVMClassification$.main(SVMClassification.scala:114)
        at fosdem.SVMClassification.main(SVMClassification.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException
        at org.apache.flink.core.memory.HeapMemorySegment.put(HeapMemorySegment.java:128)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:195)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
        at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:266)
        at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
        at com.esotericsoftware.kryo.io.Output.writeInts(Output.java:669)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:63)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:52)
        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
        ... 9 more

On Wed, Jan 20, 2016 at 9:45 PM, Stephan Ewen <[hidden email]> wrote:
Can you again post the stack trace? With the patched branch, the reference mapper should not be used any more (which is where the original exception occurred).

On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis <[hidden email]> wrote:
Alright I will try to do that.

I've tried running the job with a CSV file as input, and using DenseVectors to represent the features, still the same IndexOutOfBounds error.

On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <[hidden email]> wrote:

You could change the version of Stephan’s branch via mvn versions:set -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after you install the Flink binaries you can reference them in your project by setting the version of your Flink dependencies to MyCustomBuildVersion. That way, you are sure that the right dependencies are used.

Alternatively, you could compile an example program with example input data which can reproduce the problem. Then I could also take a look at it.

Cheers,
Till


On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <[hidden email]> wrote:
OK here's what I tried:

* Build Flink (mvn clean install) from the branch you linked (kryo)
* Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version, added local maven repo to resolvers so that it picks up the previously installed version (I hope)
* Launch local cluster from newly built Flink, try to run job

Still getting the same error.

Is there a way to ensure that SBT is picking up the local version of Flink to build the uber-jar?
Does it matter in this case, or is it enough that I'm sure the launched Flink instance comes from the branch you linked?


On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <[hidden email]> wrote:
The bug looks to be in the serialization via Kryo while spilling windows. Note that Kryo is here used as a fallback serializer, since the SparseVector is not transparent type to Flink.

I think there are two possible reasons:
  1) Kryo, or our Kryo setup has an issue here
  2) Kryo is inconsistently configured. There are multiple Kryo instances used across the serializers in the sorter. There may be a bug that they are not initialized in sync.


To check this, can you build Flink with this pull request (https://github.com/apache/flink/pull/1528) or from this branch (https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes it?


Thanks,
Stephan





On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <[hidden email]> wrote:
I haven't been able to reproduce this with other datasets. Taking a smaller sample from the large dataset I'm using (link to data) causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here. I've tried the new version commited recently by Chiwan, but I still get the same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <[hidden email]> wrote:
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by registerFlinkMLTypes which is called when the SVM predict or evaluate functions are called) in my job but I still get the same. I will try a couple different datasets and try to see if it's the number of features that is causing this or something else.

So far it works fine for a dataset with 8 features, but the large one has 2000 and I get the above error there. I will try large datasets with a few features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <[hidden email]> wrote:
Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed) however, I get the following error:


java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
Serialization trace:
indices (org.apache.flink.ml.math.SparseVector)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 10 more


Any idea what might be causing this? I'm running the job in local mode, 1 TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.