EOF reading file from HDFS

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

EOF reading file from HDFS

Flavio Pompermaier
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: EOF reading file from HDFS

Fabian Hueske-2
Have you tried to explicitly set the blocksize parameter when writing and reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: EOF reading file from HDFS

Flavio Pompermaier
How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?

On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[hidden email]> wrote:
Have you tried to explicitly set the blocksize parameter when writing and reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: EOF reading file from HDFS

Fabian Hueske-2
The value of the parameter is not important for correctness but it must be the same when writing and reading.
Try setting it to 64 MB.


2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[hidden email]>:
How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?

On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[hidden email]> wrote:
Have you tried to explicitly set the blocksize parameter when writing and reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: EOF reading file from HDFS

Flavio Pompermaier
If I haven't set that param in the code of the job do you think Flink automatically infer that param from somewher in the hadoop xxx-site.xml files or from the hadoop cluster?

On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[hidden email]> wrote:
The value of the parameter is not important for correctness but it must be the same when writing and reading.
Try setting it to 64 MB.


2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[hidden email]>:
How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?

On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[hidden email]> wrote:
Have you tried to explicitly set the blocksize parameter when writing and reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio






Reply | Threaded
Open this post in threaded view
|

Re: EOF reading file from HDFS

Fabian Hueske-2
The point is that you don't want Flink to automatically infer the parameter because the default parameter depends on the file system.
If you write a file to local FS this happens with a different default parameter than if you would write to HDFS.
Just set the parameter to 64 MB when reading and writing to the same value.

2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[hidden email]>:
If I haven't set that param in the code of the job do you think Flink automatically infer that param from somewher in the hadoop xxx-site.xml files or from the hadoop cluster?

On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[hidden email]> wrote:
The value of the parameter is not important for correctness but it must be the same when writing and reading.
Try setting it to 64 MB.


2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[hidden email]>:
How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?

On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[hidden email]> wrote:
Have you tried to explicitly set the blocksize parameter when writing and reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio







Reply | Threaded
Open this post in threaded view
|

Re: EOF reading file from HDFS

Flavio Pompermaier
I reached this solution, do you think it could be ok (taking into account that my local fs block size is 4096):

blockSize = new org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);

In this way, if I find the hadoop config files in the resources folder I use that blockSize, otherwise 4096.
In this way, also if I run the job locally, I'll have consistent setting whether the hadoop config files are there or not..

Now I have another problem..the byte[] of the Tuple2 is written using thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the cuase of some errors like

 java.lang.AbstractMethodError: org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
at org.apache.thrift.TUnion.read(TUnion.java:135)
at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187)
at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176)
at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164)
at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149)

What is the best way to fix such version mismatching problems?

On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <[hidden email]> wrote:
The point is that you don't want Flink to automatically infer the parameter because the default parameter depends on the file system.
If you write a file to local FS this happens with a different default parameter than if you would write to HDFS.
Just set the parameter to 64 MB when reading and writing to the same value.

2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[hidden email]>:
If I haven't set that param in the code of the job do you think Flink automatically infer that param from somewher in the hadoop xxx-site.xml files or from the hadoop cluster?

On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[hidden email]> wrote:
The value of the parameter is not important for correctness but it must be the same when writing and reading.
Try setting it to 64 MB.


2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[hidden email]>:
How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?

On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[hidden email]> wrote:
Have you tried to explicitly set the blocksize parameter when writing and reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio









Reply | Threaded
Open this post in threaded view
|

Re: EOF reading file from HDFS

Fabian Hueske-2

I think you are confusing the block sizes of the local FS, HDFS, and the BinaryIF/OF. The block size of the local FS is not related. In fact, 4kb is much too small. Each block will result in one split. 4kb block size will create thousands of splits causing a lot of scheduling overhead.

The HDFS block size should be a multiple of the BinaryIF/OF block size to avoid data access over the network.

I would set the default block size to 32 or 64MB given that multiples of 64MB are common values for HDFS block sizes.

I reached this solution, do you think it could be ok (taking into account that my local fs block size is 4096):

blockSize = new org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);

In this way, if I find the hadoop config files in the resources folder I use that blockSize, otherwise 4096.
In this way, also if I run the job locally, I'll have consistent setting whether the hadoop config files are there or not..

Now I have another problem..the byte[] of the Tuple2 is written using thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the cuase of some errors like

 java.lang.AbstractMethodError: org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
at org.apache.thrift.TUnion.read(TUnion.java:135)
at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187)
at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176)
at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164)
at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149)

What is the best way to fix such version mismatching problems?

On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <[hidden email]> wrote:
The point is that you don't want Flink to automatically infer the parameter because the default parameter depends on the file system.
If you write a file to local FS this happens with a different default parameter than if you would write to HDFS.
Just set the parameter to 64 MB when reading and writing to the same value.

2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[hidden email]>:
If I haven't set that param in the code of the job do you think Flink automatically infer that param from somewher in the hadoop xxx-site.xml files or from the hadoop cluster?

On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[hidden email]> wrote:
The value of the parameter is not important for correctness but it must be the same when writing and reading.
Try setting it to 64 MB.


2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[hidden email]>:
How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?

On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[hidden email]> wrote:
Have you tried to explicitly set the blocksize parameter when writing and reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio









Reply | Threaded
Open this post in threaded view
|

Re: EOF reading file from HDFS

Stephan Ewen
Working around dependency mismatches is a bit tricky such large systems that have many (transitive) dependencies. You are probably colliding with the thrift version packaged by Hadoop (which
in turn is packaged by Flink).

You can fix that for now by compiling your own version of Flink, after setting the Thrift version to 0.9.2 in the root POM file. You can try as a quick workaround to simply include Thrift 0.9.2 in
your user jar (and that way override for user code the thrift classes).

If Thrift 0.9.2 is backwards compatible, we can bump the Flink internal version of Thrift ans fix this for the future. If not, we may have to shade it away like some other dependencies...


On Fri, May 8, 2015 at 7:08 PM, Fabian Hueske <[hidden email]> wrote:

I think you are confusing the block sizes of the local FS, HDFS, and the BinaryIF/OF. The block size of the local FS is not related. In fact, 4kb is much too small. Each block will result in one split. 4kb block size will create thousands of splits causing a lot of scheduling overhead.

The HDFS block size should be a multiple of the BinaryIF/OF block size to avoid data access over the network.

I would set the default block size to 32 or 64MB given that multiples of 64MB are common values for HDFS block sizes.

I reached this solution, do you think it could be ok (taking into account that my local fs block size is 4096):

blockSize = new org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);

In this way, if I find the hadoop config files in the resources folder I use that blockSize, otherwise 4096.
In this way, also if I run the job locally, I'll have consistent setting whether the hadoop config files are there or not..

Now I have another problem..the byte[] of the Tuple2 is written using thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the cuase of some errors like

 java.lang.AbstractMethodError: org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
at org.apache.thrift.TUnion.read(TUnion.java:135)
at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187)
at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176)
at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164)
at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149)

What is the best way to fix such version mismatching problems?

On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <[hidden email]> wrote:
The point is that you don't want Flink to automatically infer the parameter because the default parameter depends on the file system.
If you write a file to local FS this happens with a different default parameter than if you would write to HDFS.
Just set the parameter to 64 MB when reading and writing to the same value.

2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[hidden email]>:
If I haven't set that param in the code of the job do you think Flink automatically infer that param from somewher in the hadoop xxx-site.xml files or from the hadoop cluster?

On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[hidden email]> wrote:
The value of the parameter is not important for correctness but it must be the same when writing and reading.
Try setting it to 64 MB.


2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[hidden email]>:
How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?

On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[hidden email]> wrote:
Have you tried to explicitly set the blocksize parameter when writing and reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code:

Configuration configuration = new Configuration();
TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>();
outputFormat.setOutputFilePath(new Path(targetDir));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
outputFormat.configure(configuration);
ds.output(outputFormat);

Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception:

java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)

Could you help in understanding what's going on?

The code I use to read the serialized ds is:

TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>(
BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo);
inputFormat.setFilePath(new Path(inputDir));
inputFormat.configure(conf);
DataSet<Tuple6<String, String, String, String, String, String>> ret =  env.createInput(inputFormat).flatMap(XXX);


Best,
Flavio