Hi to all,
I've created a dataset of Tuple2<String,byte[]> and I saved it on my local fs (a folder with 8 files because I run the program with parallelism 8) with the following code: Configuration configuration = new Configuration(); TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new TypeSerializerOutputFormat<>(); outputFormat.setOutputFilePath(new Path(targetDir)); outputFormat.setWriteMode(WriteMode.OVERWRITE); outputFormat.configure(configuration); ds.output(outputFormat); Then, if I read such a folder from the local fs everything is fine, otherwise if I read it from HDFS I get the following exception: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62) at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87) at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50) at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) Could you help in understanding what's going on? The code I use to read the serialized ds is: TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new TupleTypeInfo<Tuple2<String, byte[]>>( BasicTypeInfo.STRING_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new TypeSerializerInputFormat<>(tInfo); inputFormat.setFilePath(new Path(inputDir)); inputFormat.configure(conf); DataSet<Tuple6<String, String, String, String, String, String>> ret = env.createInput(inputFormat).flatMap(XXX); Best, Flavio |
Have you tried to explicitly set the blocksize parameter when writing and reading? The default value might be different when reading from local FS and HDFS.2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?
On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[hidden email]> wrote:
|
The value of the parameter is not important for correctness but it must be the same when writing and reading. Try setting it to 64 MB. 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
If I haven't set that param in the code of the job do you think Flink automatically infer that param from somewher in the hadoop xxx-site.xml files or from the hadoop cluster?
On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[hidden email]> wrote:
|
The point is that you don't want Flink to automatically infer the parameter because the default parameter depends on the file system. If you write a file to local FS this happens with a different default parameter than if you would write to HDFS.2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
I reached this solution, do you think it could be ok (taking into account that my local fs block size is 4096):
blockSize = new org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); In this way, if I find the hadoop config files in the resources folder I use that blockSize, otherwise 4096. In this way, also if I run the job locally, I'll have consistent setting whether the hadoop config files are there or not.. Now I have another problem..the byte[] of the Tuple2 is written using thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the cuase of some errors like java.lang.AbstractMethodError: org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object; at org.apache.thrift.TUnion.read(TUnion.java:135) at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187) at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176) at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164) at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149) What is the best way to fix such version mismatching problems? On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <[hidden email]> wrote:
|
I think you are confusing the block sizes of the local FS, HDFS, and the BinaryIF/OF. The block size of the local FS is not related. In fact, 4kb is much too small. Each block will result in one split. 4kb block size will create thousands of splits causing a lot of scheduling overhead. The HDFS block size should be a multiple of the BinaryIF/OF block size to avoid data access over the network. I would set the default block size to 32 or 64MB given that multiples of 64MB are common values for HDFS block sizes. I reached this solution, do you think it could be ok (taking into account that my local fs block size is 4096):
blockSize = new org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); In this way, if I find the hadoop config files in the resources folder I use that blockSize, otherwise 4096. In this way, also if I run the job locally, I'll have consistent setting whether the hadoop config files are there or not.. Now I have another problem..the byte[] of the Tuple2 is written using thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the cuase of some errors like java.lang.AbstractMethodError: org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object; at org.apache.thrift.TUnion.read(TUnion.java:135) at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187) at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176) at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164) at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149) What is the best way to fix such version mismatching problems? On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <[hidden email]> wrote:
|
Working around dependency mismatches is a bit tricky such large systems that have many (transitive) dependencies. You are probably colliding with the thrift version packaged by Hadoop (which in turn is packaged by Flink). You can fix that for now by compiling your own version of Flink, after setting the Thrift version to 0.9.2 in the root POM file. You can try as a quick workaround to simply include Thrift 0.9.2 in your user jar (and that way override for user code the thrift classes). If Thrift 0.9.2 is backwards compatible, we can bump the Flink internal version of Thrift ans fix this for the future. If not, we may have to shade it away like some other dependencies... On Fri, May 8, 2015 at 7:08 PM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |