Invalid argument reading a file containing a Kryo object

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio







Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio








Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio









Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio










Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio












Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio













Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use  env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio














Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use  env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio















Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?

On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <[hidden email]> wrote:
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use  env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio
















Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
You need to do something like this:

public class YourInputFormat extends FileInputFormat<Object> {
   
   private boolean objectRead;
   
   @Override
   public FileInputSplit[] createInputSplits(int minNumSplits) {
      // Create one FileInputSplit for each file you want to read.
      // Check FileInputFormat for how to recursively enumerate files.
      // Input splits must start at 0 and have a length equal to length of the file to read.
      return null;
   }
   
   @Override
   public void open(FileInputSplit split) throws IOException {
      super.open(split);
      objectRead = false;
   }
   
   @Override
   public boolean reachedEnd() throws IOException {
      return this.objectRead;
   }

   @Override
   public Object nextRecord(Object reuse) throws IOException {
      Object yourObject = this.stream.read(); // use Kryo here to read from this.stream()
      this.objectRead = true; // read only one object
      return yourObject;
   }
}

2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?

On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <[hidden email]> wrote:
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use  env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio

















Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
Hi Fabian,
thanks to your help I finally managed to successfully generate a DataSet from my folder but I think that there are some inconsistencies in the hierarchy of InputFormats.
The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the behaviour of the FileInputFormat (so respect unsplittable and enumerateNestedFiles) while they doesn't take into account those flags.
Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" that maybe should be removed or fixed :)

Also maintaing aligned testForUnsplittable and decorateInputStream is somehow dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to protected?

So basically, my needs was to implement a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a lot of overrides..am I doing something wrong or are those inputFormat somehow to improve..? This is my IF code (remark: from the comment "Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code is copied-and-pasted from FileInputFormat..thus MY code ends there):

public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RowBundleInputFormat.class);

/** The fraction that the last split may be larger than the others. */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
private boolean objectRead;

public RowBundleInputFormat() {
super(new GenericTypeInfo<>(RowBundle.class));
unsplittable = true;
}

@Override
protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable {
return inputStream;
}

@Override
protected boolean testForUnsplittable(FileStatus pathFile) {
return true;
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
objectRead = false;
}

@Override
public boolean reachedEnd() throws IOException {
return this.objectRead;
}

@Override
public RowBundle nextRecord(RowBundle reuse) throws IOException {
RowBundle yourObject = super.nextRecord(reuse);
this.objectRead = true; // read only one object
return yourObject;
}

// -------------------------------------------------------------------
// Copied from FileInputFormat (override TypeSerializerInputFormat)
// -------------------------------------------------------------------
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
if (minNumSplits < 1) {
throw new IllegalArgumentException(
"Number of input splits has to be at least 1.");
}

// take the desired number of splits into account
minNumSplits = Math.max(minNumSplits, this.numSplits);

final Path path = this.filePath;
final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
minNumSplits);

// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
long totalLength = 0;

final FileSystem fs = path.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(path);

if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (dir[i].isDir()) {
if (enumerateNestedFiles) {
if (acceptFile(dir[i])) {
totalLength += addNestedFiles(dir[i].getPath(),
files, 0, true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
} else {
if (acceptFile(dir[i])) {
files.add(dir[i]);
totalLength += dir[i].getLen();
// as soon as there is one deflate file in a directory,
// we can not split it
testForUnsplittable(dir[i]);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("File "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
} else {
testForUnsplittable(pathFile);

files.add(pathFile);
totalLength += pathFile.getLen();
}
// returns if unsplittable
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, file.getLen());
Set<String> hosts = new HashSet<String>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) {
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, len, hosts.toArray(new String[hosts
.size()]));
inputSplits.add(fis);
}
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
: (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
: 1));

// now that we have the files, generate the splits
int splitNum = 0;
for (final FileStatus file : files) {

final long len = file.getLen();
final long blockSize = file.getBlockSize();

final long minSplitSize;
if (this.minSplitSize <= blockSize) {
minSplitSize = this.minSplitSize;
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Minimal split size of " + this.minSplitSize
+ " is larger than the block size of " + blockSize
+ ". Decreasing minimal split size to block size.");
}
minSplitSize = blockSize;
}

final long splitSize = Math.max(minSplitSize,
Math.min(maxSplitSize, blockSize));
final long halfSplit = splitSize >>> 1;

final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);

if (len > 0) {

// get the block locations and make sure they are in order with
// respect to their offset
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, len);
Arrays.sort(blocks);

long bytesUnassigned = len;
long position = 0;

int blockIndex = 0;

while (bytesUnassigned > maxBytesForLastSplit) {
// get the block containing the majority of the data
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
// create a new split
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, splitSize,
blocks[blockIndex].getHosts());
inputSplits.add(fis);

// adjust the positions
position += splitSize;
bytesUnassigned -= splitSize;
}

// assign the last split
if (bytesUnassigned > 0) {
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, bytesUnassigned,
blocks[blockIndex].getHosts());
inputSplits.add(fis);
}
} else {
// special case with a file of zero bytes size
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, 0);
String[] hosts;
if (blocks.length > 0) {
hosts = blocks[0].getHosts();
} else {
hosts = new String[0];
}
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, 0, hosts);
inputSplits.add(fis);
}
}

return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

/**
* Recursively traverse the input directory structure and enumerate all
* accepted nested files.
* @return the total length of accepted files.
*/
private long addNestedFiles(Path path, List<FileStatus> files, long length,
boolean logExcludedFiles) throws IOException {
final FileSystem fs = path.getFileSystem();

for (FileStatus dir : fs.listStatus(path)) {
if (dir.isDir()) {
if (acceptFile(dir)) {
addNestedFiles(dir.getPath(), files, length,
logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
} else {
if (acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}

/**
* Retrieves the index of the <tt>BlockLocation</tt> that contains the part
* of the file described by the given offset.
* @param blocks
*            The different blocks of the file. Must be ordered by their
*            offset.
* @param offset
*            The offset of the position in the file.
* @param startIndex
*            The earliest index to look at.
* @return The index of the block containing the given position.
*/
private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
long halfSplitSize, int startIndex) {
// go over all indexes after the startIndex
for (int i = startIndex; i < blocks.length; i++) {
long blockStart = blocks[i].getOffset();
long blockEnd = blockStart + blocks[i].getLength();

if (offset >= blockStart && offset < blockEnd) {
// got the block where the split starts
// check if the next block contains more than this one does
if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
return i + 1;
} else {
return i;
}
}
}
throw new IllegalArgumentException("The given offset is not contained in the any block.");
}

}




On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <[hidden email]> wrote:
You need to do something like this:

public class YourInputFormat extends FileInputFormat<Object> {
   
   private boolean objectRead;
   
   @Override
   public FileInputSplit[] createInputSplits(int minNumSplits) {
      // Create one FileInputSplit for each file you want to read.
      // Check FileInputFormat for how to recursively enumerate files.
      // Input splits must start at 0 and have a length equal to length of the file to read.
      return null;
   }
   
   @Override
   public void open(FileInputSplit split) throws IOException {
      super.open(split);
      objectRead = false;
   }
   
   @Override
   public boolean reachedEnd() throws IOException {
      return this.objectRead;
   }

   @Override
   public Object nextRecord(Object reuse) throws IOException {
      Object yourObject = this.stream.read(); // use Kryo here to read from this.stream()
      this.objectRead = true; // read only one object
      return yourObject;
   }
}

2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?

On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <[hidden email]> wrote:
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use  env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio




















--

Flavio Pompermaier
Development Department
_______________________________________________
OKKAMSrl www.okkam.it

Phone: +(39) 0461 283 702
Fax: + (39) 0461 186 6433
Email: [hidden email]
Headquarters: Trento (Italy), via G.B. Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.

Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
Congrats that you got your InputFormat working!
It is true, there can be a few inconsistencies in the Formats derived from FileInputFormat.

It would be great if you could open JIRAs for these issues. Otherwise, the might get lost on the mailing list.

Thanks, Fabian

2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks to your help I finally managed to successfully generate a DataSet from my folder but I think that there are some inconsistencies in the hierarchy of InputFormats.
The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the behaviour of the FileInputFormat (so respect unsplittable and enumerateNestedFiles) while they doesn't take into account those flags.
Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" that maybe should be removed or fixed :)

Also maintaing aligned testForUnsplittable and decorateInputStream is somehow dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to protected?

So basically, my needs was to implement a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a lot of overrides..am I doing something wrong or are those inputFormat somehow to improve..? This is my IF code (remark: from the comment "Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code is copied-and-pasted from FileInputFormat..thus MY code ends there):

public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RowBundleInputFormat.class);

/** The fraction that the last split may be larger than the others. */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
private boolean objectRead;

public RowBundleInputFormat() {
super(new GenericTypeInfo<>(RowBundle.class));
unsplittable = true;
}

@Override
protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable {
return inputStream;
}

@Override
protected boolean testForUnsplittable(FileStatus pathFile) {
return true;
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
objectRead = false;
}

@Override
public boolean reachedEnd() throws IOException {
return this.objectRead;
}

@Override
public RowBundle nextRecord(RowBundle reuse) throws IOException {
RowBundle yourObject = super.nextRecord(reuse);
this.objectRead = true; // read only one object
return yourObject;
}

// -------------------------------------------------------------------
// Copied from FileInputFormat (override TypeSerializerInputFormat)
// -------------------------------------------------------------------
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
if (minNumSplits < 1) {
throw new IllegalArgumentException(
"Number of input splits has to be at least 1.");
}

// take the desired number of splits into account
minNumSplits = Math.max(minNumSplits, this.numSplits);

final Path path = this.filePath;
final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
minNumSplits);

// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
long totalLength = 0;

final FileSystem fs = path.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(path);

if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (dir[i].isDir()) {
if (enumerateNestedFiles) {
if (acceptFile(dir[i])) {
totalLength += addNestedFiles(dir[i].getPath(),
files, 0, true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
} else {
if (acceptFile(dir[i])) {
files.add(dir[i]);
totalLength += dir[i].getLen();
// as soon as there is one deflate file in a directory,
// we can not split it
testForUnsplittable(dir[i]);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("File "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
} else {
testForUnsplittable(pathFile);

files.add(pathFile);
totalLength += pathFile.getLen();
}
// returns if unsplittable
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, file.getLen());
Set<String> hosts = new HashSet<String>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) {
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, len, hosts.toArray(new String[hosts
.size()]));
inputSplits.add(fis);
}
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
: (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
: 1));

// now that we have the files, generate the splits
int splitNum = 0;
for (final FileStatus file : files) {

final long len = file.getLen();
final long blockSize = file.getBlockSize();

final long minSplitSize;
if (this.minSplitSize <= blockSize) {
minSplitSize = this.minSplitSize;
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Minimal split size of " + this.minSplitSize
+ " is larger than the block size of " + blockSize
+ ". Decreasing minimal split size to block size.");
}
minSplitSize = blockSize;
}

final long splitSize = Math.max(minSplitSize,
Math.min(maxSplitSize, blockSize));
final long halfSplit = splitSize >>> 1;

final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);

if (len > 0) {

// get the block locations and make sure they are in order with
// respect to their offset
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, len);
Arrays.sort(blocks);

long bytesUnassigned = len;
long position = 0;

int blockIndex = 0;

while (bytesUnassigned > maxBytesForLastSplit) {
// get the block containing the majority of the data
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
// create a new split
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, splitSize,
blocks[blockIndex].getHosts());
inputSplits.add(fis);

// adjust the positions
position += splitSize;
bytesUnassigned -= splitSize;
}

// assign the last split
if (bytesUnassigned > 0) {
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, bytesUnassigned,
blocks[blockIndex].getHosts());
inputSplits.add(fis);
}
} else {
// special case with a file of zero bytes size
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, 0);
String[] hosts;
if (blocks.length > 0) {
hosts = blocks[0].getHosts();
} else {
hosts = new String[0];
}
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, 0, hosts);
inputSplits.add(fis);
}
}

return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

/**
* Recursively traverse the input directory structure and enumerate all
* accepted nested files.
* @return the total length of accepted files.
*/
private long addNestedFiles(Path path, List<FileStatus> files, long length,
boolean logExcludedFiles) throws IOException {
final FileSystem fs = path.getFileSystem();

for (FileStatus dir : fs.listStatus(path)) {
if (dir.isDir()) {
if (acceptFile(dir)) {
addNestedFiles(dir.getPath(), files, length,
logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
} else {
if (acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}

/**
* Retrieves the index of the <tt>BlockLocation</tt> that contains the part
* of the file described by the given offset.
* @param blocks
*            The different blocks of the file. Must be ordered by their
*            offset.
* @param offset
*            The offset of the position in the file.
* @param startIndex
*            The earliest index to look at.
* @return The index of the block containing the given position.
*/
private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
long halfSplitSize, int startIndex) {
// go over all indexes after the startIndex
for (int i = startIndex; i < blocks.length; i++) {
long blockStart = blocks[i].getOffset();
long blockEnd = blockStart + blocks[i].getLength();

if (offset >= blockStart && offset < blockEnd) {
// got the block where the split starts
// check if the next block contains more than this one does
if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
return i + 1;
} else {
return i;
}
}
}
throw new IllegalArgumentException("The given offset is not contained in the any block.");
}

}




On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <[hidden email]> wrote:
You need to do something like this:

public class YourInputFormat extends FileInputFormat<Object> {
   
   private boolean objectRead;
   
   @Override
   public FileInputSplit[] createInputSplits(int minNumSplits) {
      // Create one FileInputSplit for each file you want to read.
      // Check FileInputFormat for how to recursively enumerate files.
      // Input splits must start at 0 and have a length equal to length of the file to read.
      return null;
   }
   
   @Override
   public void open(FileInputSplit split) throws IOException {
      super.open(split);
      objectRead = false;
   }
   
   @Override
   public boolean reachedEnd() throws IOException {
      return this.objectRead;
   }

   @Override
   public Object nextRecord(Object reuse) throws IOException {
      Object yourObject = this.stream.read(); // use Kryo here to read from this.stream()
      this.objectRead = true; // read only one object
      return yourObject;
   }
}

2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?

On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <[hidden email]> wrote:
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use  env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio




















--

Flavio Pompermaier
Development Department
_______________________________________________
OKKAMSrl www.okkam.it

Phone: +(39) 0461 283 702
Fax: + (39) 0461 186 6433
Email: [hidden email]
Headquarters: Trento (Italy), via G.B. Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Flavio Pompermaier
Done through https://issues.apache.org/jira/browse/FLINK-2503

Thanks again,
Flavio

On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <[hidden email]> wrote:
Congrats that you got your InputFormat working!
It is true, there can be a few inconsistencies in the Formats derived from FileInputFormat.

It would be great if you could open JIRAs for these issues. Otherwise, the might get lost on the mailing list.

Thanks, Fabian

2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks to your help I finally managed to successfully generate a DataSet from my folder but I think that there are some inconsistencies in the hierarchy of InputFormats.
The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the behaviour of the FileInputFormat (so respect unsplittable and enumerateNestedFiles) while they doesn't take into account those flags.
Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" that maybe should be removed or fixed :)

Also maintaing aligned testForUnsplittable and decorateInputStream is somehow dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to protected?

So basically, my needs was to implement a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a lot of overrides..am I doing something wrong or are those inputFormat somehow to improve..? This is my IF code (remark: from the comment "Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code is copied-and-pasted from FileInputFormat..thus MY code ends there):

public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RowBundleInputFormat.class);

/** The fraction that the last split may be larger than the others. */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
private boolean objectRead;

public RowBundleInputFormat() {
super(new GenericTypeInfo<>(RowBundle.class));
unsplittable = true;
}

@Override
protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable {
return inputStream;
}

@Override
protected boolean testForUnsplittable(FileStatus pathFile) {
return true;
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
objectRead = false;
}

@Override
public boolean reachedEnd() throws IOException {
return this.objectRead;
}

@Override
public RowBundle nextRecord(RowBundle reuse) throws IOException {
RowBundle yourObject = super.nextRecord(reuse);
this.objectRead = true; // read only one object
return yourObject;
}

// -------------------------------------------------------------------
// Copied from FileInputFormat (override TypeSerializerInputFormat)
// -------------------------------------------------------------------
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
if (minNumSplits < 1) {
throw new IllegalArgumentException(
"Number of input splits has to be at least 1.");
}

// take the desired number of splits into account
minNumSplits = Math.max(minNumSplits, this.numSplits);

final Path path = this.filePath;
final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
minNumSplits);

// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
long totalLength = 0;

final FileSystem fs = path.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(path);

if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (dir[i].isDir()) {
if (enumerateNestedFiles) {
if (acceptFile(dir[i])) {
totalLength += addNestedFiles(dir[i].getPath(),
files, 0, true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
} else {
if (acceptFile(dir[i])) {
files.add(dir[i]);
totalLength += dir[i].getLen();
// as soon as there is one deflate file in a directory,
// we can not split it
testForUnsplittable(dir[i]);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("File "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
} else {
testForUnsplittable(pathFile);

files.add(pathFile);
totalLength += pathFile.getLen();
}
// returns if unsplittable
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, file.getLen());
Set<String> hosts = new HashSet<String>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) {
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, len, hosts.toArray(new String[hosts
.size()]));
inputSplits.add(fis);
}
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
: (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
: 1));

// now that we have the files, generate the splits
int splitNum = 0;
for (final FileStatus file : files) {

final long len = file.getLen();
final long blockSize = file.getBlockSize();

final long minSplitSize;
if (this.minSplitSize <= blockSize) {
minSplitSize = this.minSplitSize;
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Minimal split size of " + this.minSplitSize
+ " is larger than the block size of " + blockSize
+ ". Decreasing minimal split size to block size.");
}
minSplitSize = blockSize;
}

final long splitSize = Math.max(minSplitSize,
Math.min(maxSplitSize, blockSize));
final long halfSplit = splitSize >>> 1;

final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);

if (len > 0) {

// get the block locations and make sure they are in order with
// respect to their offset
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, len);
Arrays.sort(blocks);

long bytesUnassigned = len;
long position = 0;

int blockIndex = 0;

while (bytesUnassigned > maxBytesForLastSplit) {
// get the block containing the majority of the data
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
// create a new split
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, splitSize,
blocks[blockIndex].getHosts());
inputSplits.add(fis);

// adjust the positions
position += splitSize;
bytesUnassigned -= splitSize;
}

// assign the last split
if (bytesUnassigned > 0) {
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, bytesUnassigned,
blocks[blockIndex].getHosts());
inputSplits.add(fis);
}
} else {
// special case with a file of zero bytes size
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, 0);
String[] hosts;
if (blocks.length > 0) {
hosts = blocks[0].getHosts();
} else {
hosts = new String[0];
}
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, 0, hosts);
inputSplits.add(fis);
}
}

return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

/**
* Recursively traverse the input directory structure and enumerate all
* accepted nested files.
* @return the total length of accepted files.
*/
private long addNestedFiles(Path path, List<FileStatus> files, long length,
boolean logExcludedFiles) throws IOException {
final FileSystem fs = path.getFileSystem();

for (FileStatus dir : fs.listStatus(path)) {
if (dir.isDir()) {
if (acceptFile(dir)) {
addNestedFiles(dir.getPath(), files, length,
logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
} else {
if (acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}

/**
* Retrieves the index of the <tt>BlockLocation</tt> that contains the part
* of the file described by the given offset.
* @param blocks
*            The different blocks of the file. Must be ordered by their
*            offset.
* @param offset
*            The offset of the position in the file.
* @param startIndex
*            The earliest index to look at.
* @return The index of the block containing the given position.
*/
private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
long halfSplitSize, int startIndex) {
// go over all indexes after the startIndex
for (int i = startIndex; i < blocks.length; i++) {
long blockStart = blocks[i].getOffset();
long blockEnd = blockStart + blocks[i].getLength();

if (offset >= blockStart && offset < blockEnd) {
// got the block where the split starts
// check if the next block contains more than this one does
if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
return i + 1;
} else {
return i;
}
}
}
throw new IllegalArgumentException("The given offset is not contained in the any block.");
}

}




On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <[hidden email]> wrote:
You need to do something like this:

public class YourInputFormat extends FileInputFormat<Object> {
   
   private boolean objectRead;
   
   @Override
   public FileInputSplit[] createInputSplits(int minNumSplits) {
      // Create one FileInputSplit for each file you want to read.
      // Check FileInputFormat for how to recursively enumerate files.
      // Input splits must start at 0 and have a length equal to length of the file to read.
      return null;
   }
   
   @Override
   public void open(FileInputSplit split) throws IOException {
      super.open(split);
      objectRead = false;
   }
   
   @Override
   public boolean reachedEnd() throws IOException {
      return this.objectRead;
   }

   @Override
   public Object nextRecord(Object reuse) throws IOException {
      Object yourObject = this.stream.read(); // use Kryo here to read from this.stream()
      this.objectRead = true; // read only one object
      return yourObject;
   }
}

2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?

On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <[hidden email]> wrote:
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use  env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio




















Reply | Threaded
Open this post in threaded view
|

Re: Invalid argument reading a file containing a Kryo object

Fabian Hueske-2
Thanks a lot!

2015-08-10 12:20 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Done through https://issues.apache.org/jira/browse/FLINK-2503

Thanks again,
Flavio

On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <[hidden email]> wrote:
Congrats that you got your InputFormat working!
It is true, there can be a few inconsistencies in the Formats derived from FileInputFormat.

It would be great if you could open JIRAs for these issues. Otherwise, the might get lost on the mailing list.

Thanks, Fabian

2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks to your help I finally managed to successfully generate a DataSet from my folder but I think that there are some inconsistencies in the hierarchy of InputFormats.
The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the behaviour of the FileInputFormat (so respect unsplittable and enumerateNestedFiles) while they doesn't take into account those flags.
Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" that maybe should be removed or fixed :)

Also maintaing aligned testForUnsplittable and decorateInputStream is somehow dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to protected?

So basically, my needs was to implement a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a lot of overrides..am I doing something wrong or are those inputFormat somehow to improve..? This is my IF code (remark: from the comment "Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code is copied-and-pasted from FileInputFormat..thus MY code ends there):

public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RowBundleInputFormat.class);

/** The fraction that the last split may be larger than the others. */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
private boolean objectRead;

public RowBundleInputFormat() {
super(new GenericTypeInfo<>(RowBundle.class));
unsplittable = true;
}

@Override
protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable {
return inputStream;
}

@Override
protected boolean testForUnsplittable(FileStatus pathFile) {
return true;
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
objectRead = false;
}

@Override
public boolean reachedEnd() throws IOException {
return this.objectRead;
}

@Override
public RowBundle nextRecord(RowBundle reuse) throws IOException {
RowBundle yourObject = super.nextRecord(reuse);
this.objectRead = true; // read only one object
return yourObject;
}

// -------------------------------------------------------------------
// Copied from FileInputFormat (override TypeSerializerInputFormat)
// -------------------------------------------------------------------
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
if (minNumSplits < 1) {
throw new IllegalArgumentException(
"Number of input splits has to be at least 1.");
}

// take the desired number of splits into account
minNumSplits = Math.max(minNumSplits, this.numSplits);

final Path path = this.filePath;
final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
minNumSplits);

// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
long totalLength = 0;

final FileSystem fs = path.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(path);

if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (dir[i].isDir()) {
if (enumerateNestedFiles) {
if (acceptFile(dir[i])) {
totalLength += addNestedFiles(dir[i].getPath(),
files, 0, true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
} else {
if (acceptFile(dir[i])) {
files.add(dir[i]);
totalLength += dir[i].getLen();
// as soon as there is one deflate file in a directory,
// we can not split it
testForUnsplittable(dir[i]);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("File "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
} else {
testForUnsplittable(pathFile);

files.add(pathFile);
totalLength += pathFile.getLen();
}
// returns if unsplittable
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, file.getLen());
Set<String> hosts = new HashSet<String>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) {
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, len, hosts.toArray(new String[hosts
.size()]));
inputSplits.add(fis);
}
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
: (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
: 1));

// now that we have the files, generate the splits
int splitNum = 0;
for (final FileStatus file : files) {

final long len = file.getLen();
final long blockSize = file.getBlockSize();

final long minSplitSize;
if (this.minSplitSize <= blockSize) {
minSplitSize = this.minSplitSize;
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Minimal split size of " + this.minSplitSize
+ " is larger than the block size of " + blockSize
+ ". Decreasing minimal split size to block size.");
}
minSplitSize = blockSize;
}

final long splitSize = Math.max(minSplitSize,
Math.min(maxSplitSize, blockSize));
final long halfSplit = splitSize >>> 1;

final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);

if (len > 0) {

// get the block locations and make sure they are in order with
// respect to their offset
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, len);
Arrays.sort(blocks);

long bytesUnassigned = len;
long position = 0;

int blockIndex = 0;

while (bytesUnassigned > maxBytesForLastSplit) {
// get the block containing the majority of the data
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
// create a new split
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, splitSize,
blocks[blockIndex].getHosts());
inputSplits.add(fis);

// adjust the positions
position += splitSize;
bytesUnassigned -= splitSize;
}

// assign the last split
if (bytesUnassigned > 0) {
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, bytesUnassigned,
blocks[blockIndex].getHosts());
inputSplits.add(fis);
}
} else {
// special case with a file of zero bytes size
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, 0);
String[] hosts;
if (blocks.length > 0) {
hosts = blocks[0].getHosts();
} else {
hosts = new String[0];
}
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, 0, hosts);
inputSplits.add(fis);
}
}

return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

/**
* Recursively traverse the input directory structure and enumerate all
* accepted nested files.
* @return the total length of accepted files.
*/
private long addNestedFiles(Path path, List<FileStatus> files, long length,
boolean logExcludedFiles) throws IOException {
final FileSystem fs = path.getFileSystem();

for (FileStatus dir : fs.listStatus(path)) {
if (dir.isDir()) {
if (acceptFile(dir)) {
addNestedFiles(dir.getPath(), files, length,
logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
} else {
if (acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}

/**
* Retrieves the index of the <tt>BlockLocation</tt> that contains the part
* of the file described by the given offset.
* @param blocks
*            The different blocks of the file. Must be ordered by their
*            offset.
* @param offset
*            The offset of the position in the file.
* @param startIndex
*            The earliest index to look at.
* @return The index of the block containing the given position.
*/
private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
long halfSplitSize, int startIndex) {
// go over all indexes after the startIndex
for (int i = startIndex; i < blocks.length; i++) {
long blockStart = blocks[i].getOffset();
long blockEnd = blockStart + blocks[i].getLength();

if (offset >= blockStart && offset < blockEnd) {
// got the block where the split starts
// check if the next block contains more than this one does
if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
return i + 1;
} else {
return i;
}
}
}
throw new IllegalArgumentException("The given offset is not contained in the any block.");
}

}




On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <[hidden email]> wrote:
You need to do something like this:

public class YourInputFormat extends FileInputFormat<Object> {
   
   private boolean objectRead;
   
   @Override
   public FileInputSplit[] createInputSplits(int minNumSplits) {
      // Create one FileInputSplit for each file you want to read.
      // Check FileInputFormat for how to recursively enumerate files.
      // Input splits must start at 0 and have a length equal to length of the file to read.
      return null;
   }
   
   @Override
   public void open(FileInputSplit split) throws IOException {
      super.open(split);
      objectRead = false;
   }
   
   @Override
   public boolean reachedEnd() throws IOException {
      return this.objectRead;
   }

   @Override
   public Object nextRecord(Object reuse) throws IOException {
      Object yourObject = this.stream.read(); // use Kryo here to read from this.stream()
      this.objectRead = true; // read only one object
      return yourObject;
   }
}

2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?

On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <[hidden email]> wrote:
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use  env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Should this be the case just reading recursively an entire directory containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio