Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..). Caused by: java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.position0(Native Method) at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) ----------------------------------------------- My program is basically the following: public static void main(String[] args) throws Exception { ... //try-with-resources used to autoclose resources try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) { //serialise object Kryo kryo=new Kryo(); kryo.writeClassAndObject(output, myObj); } catch (FileNotFoundException ex) { LOG.error(ex.getMessage(), ex); } //deserialise object myObj=null; try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){ Kryo kryo=new Kryo(); myObj =(MyClass)kryo.readClassAndObject(input); } catch (FileNotFoundException ex) { LOG.error(ex.getMessage(), ex); } final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class); Configuration configuration = new Configuration(); configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024); TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class); final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo); inputFormat.setFilePath("file:/tmp/KryoTest.ser"); inputFormat.configure(configuration); DataSet<MyClass> ds = env.createInput(inputFormat); ds.print(); } private static final class MyClassSerializer extends Serializer<MyClass> { @Override public void write(Kryo kryo, Output output, MyClass object) { kryo.writeClassAndObject(output, object); } @Override public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { return (MyClass) kryo.readClassAndObject(input); } } Am I doing something wrong? Best, Flavio |
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1. Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink). 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
I also tried with
DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1); but I get the same error :( Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right? On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <[hidden email]> wrote:
|
This might be an issue with the blockSize parameter of the BinaryInputFormat. How large is the file with the single object?2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
The file containing the serialized object is 7 bytes
On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <[hidden email]> wrote:
|
Ah, I checked the code. So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat. 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
So what should I do?
On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
|
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file. 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Should this be the case just reading recursively an entire directory containing one object per file?
On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <[hidden email]> wrote:
|
I don't know your use case. The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split... 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
I have a directory containing a list of files, each one containing a kryo-serialized object.
With json serialized objects I don't have that problem (but there I use env.readTextFile(path.withParameters(parameters) where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true). On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <[hidden email]> wrote:
|
Enumeration of nested files is a feature of the FileInputFormat. If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this? On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <[hidden email]> wrote:
|
You need to do something like this: // Input splits must start at 0 and have a length equal to length of the file to read.public class YourInputFormat extends FileInputFormat<Object> { private boolean objectRead; @Override public FileInputSplit[] createInputSplits(int minNumSplits) { // Create one FileInputSplit for each file you want to read. // Check FileInputFormat for how to recursively enumerate files. return null; } @Override public void open(FileInputSplit split) throws IOException { super.open(split); objectRead = false; } @Override public boolean reachedEnd() throws IOException { return this.objectRead; } @Override public Object nextRecord(Object reuse) throws IOException { Object yourObject = this.stream.read(); // use Kryo here to read from this.stream() this.objectRead = true; // read only one object return yourObject; } } 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Hi Fabian,
thanks to your help I finally managed to successfully generate a DataSet from my folder but I think that there are some inconsistencies in the hierarchy of InputFormats. The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the behaviour of the FileInputFormat (so respect unsplittable and enumerateNestedFiles) while they doesn't take into account those flags. Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" that maybe should be removed or fixed :) Also maintaing aligned testForUnsplittable and decorateInputStream is somehow dangerous.. And maybe visibility for getBlockIndexForPosition should be changed to protected? So basically, my needs was to implement a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a lot of overrides..am I doing something wrong or are those inputFormat somehow to improve..? This is my IF code (remark: from the comment "Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code is copied-and-pasted from FileInputFormat..thus MY code ends there): public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RowBundleInputFormat.class); /** The fraction that the last split may be larger than the others. */ private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; private boolean objectRead; public RowBundleInputFormat() { super(new GenericTypeInfo<>(RowBundle.class)); unsplittable = true; } @Override protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { return inputStream; } @Override protected boolean testForUnsplittable(FileStatus pathFile) { return true; } @Override public void open(FileInputSplit split) throws IOException { super.open(split); objectRead = false; } @Override public boolean reachedEnd() throws IOException { return this.objectRead; } @Override public RowBundle nextRecord(RowBundle reuse) throws IOException { RowBundle yourObject = super.nextRecord(reuse); this.objectRead = true; // read only one object return yourObject; } // ------------------------------------------------------------------- // Copied from FileInputFormat (override TypeSerializerInputFormat) // ------------------------------------------------------------------- @Override public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { if (minNumSplits < 1) { throw new IllegalArgumentException( "Number of input splits has to be at least 1."); } // take the desired number of splits into account minNumSplits = Math.max(minNumSplits, this.numSplits); final Path path = this.filePath; final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>( minNumSplits); // get all the files that are involved in the splits List<FileStatus> files = new ArrayList<FileStatus>(); long totalLength = 0; final FileSystem fs = path.getFileSystem(); final FileStatus pathFile = fs.getFileStatus(path); if (pathFile.isDir()) { // input is directory. list all contained files final FileStatus[] dir = fs.listStatus(path); for (int i = 0; i < dir.length; i++) { if (dir[i].isDir()) { if (enumerateNestedFiles) { if (acceptFile(dir[i])) { totalLength += addNestedFiles(dir[i].getPath(), files, 0, true); } else { if (LOG.isDebugEnabled()) { LOG.debug("Directory " + dir[i].getPath().toString() + " did not pass the file-filter and is excluded."); } } } } else { if (acceptFile(dir[i])) { files.add(dir[i]); totalLength += dir[i].getLen(); // as soon as there is one deflate file in a directory, // we can not split it testForUnsplittable(dir[i]); } else { if (LOG.isDebugEnabled()) { LOG.debug("File " + dir[i].getPath().toString() + " did not pass the file-filter and is excluded."); } } } } } else { testForUnsplittable(pathFile); files.add(pathFile); totalLength += pathFile.getLen(); } // returns if unsplittable if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen()); Set<String> hosts = new HashSet<String>(); for (BlockLocation block : blocks) { hosts.addAll(Arrays.asList(block.getHosts())); } long len = file.getLen(); if (testForUnsplittable(file)) { len = READ_WHOLE_SPLIT_FLAG; } FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, len, hosts.toArray(new String[hosts .size()])); inputSplits.add(fis); } return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); } final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1)); // now that we have the files, generate the splits int splitNum = 0; for (final FileStatus file : files) { final long len = file.getLen(); final long blockSize = file.getBlockSize(); final long minSplitSize; if (this.minSplitSize <= blockSize) { minSplitSize = this.minSplitSize; } else { if (LOG.isWarnEnabled()) { LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " + blockSize + ". Decreasing minimal split size to block size."); } minSplitSize = blockSize; } final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize)); final long halfSplit = splitSize >>> 1; final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY); if (len > 0) { // get the block locations and make sure they are in order with // respect to their offset final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len); Arrays.sort(blocks); long bytesUnassigned = len; long position = 0; int blockIndex = 0; while (bytesUnassigned > maxBytesForLastSplit) { // get the block containing the majority of the data blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex); // create a new split FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize, blocks[blockIndex].getHosts()); inputSplits.add(fis); // adjust the positions position += splitSize; bytesUnassigned -= splitSize; } // assign the last split if (bytesUnassigned > 0) { blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex); final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, bytesUnassigned, blocks[blockIndex].getHosts()); inputSplits.add(fis); } } else { // special case with a file of zero bytes size final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0); String[] hosts; if (blocks.length > 0) { hosts = blocks[0].getHosts(); } else { hosts = new String[0]; } final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts); inputSplits.add(fis); } } return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); } /** * Recursively traverse the input directory structure and enumerate all * accepted nested files. * * @return the total length of accepted files. */ private long addNestedFiles(Path path, List<FileStatus> files, long length, boolean logExcludedFiles) throws IOException { final FileSystem fs = path.getFileSystem(); for (FileStatus dir : fs.listStatus(path)) { if (dir.isDir()) { if (acceptFile(dir)) { addNestedFiles(dir.getPath(), files, length, logExcludedFiles); } else { if (logExcludedFiles && LOG.isDebugEnabled()) { LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded."); } } } else { if (acceptFile(dir)) { files.add(dir); length += dir.getLen(); testForUnsplittable(dir); } else { if (logExcludedFiles && LOG.isDebugEnabled()) { LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded."); } } } } return length; } /** * Retrieves the index of the <tt>BlockLocation</tt> that contains the part * of the file described by the given offset. * * @param blocks * The different blocks of the file. Must be ordered by their * offset. * @param offset * The offset of the position in the file. * @param startIndex * The earliest index to look at. * @return The index of the block containing the given position. */ private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long halfSplitSize, int startIndex) { // go over all indexes after the startIndex for (int i = startIndex; i < blocks.length; i++) { long blockStart = blocks[i].getOffset(); long blockEnd = blockStart + blocks[i].getLength(); if (offset >= blockStart && offset < blockEnd) { // got the block where the split starts // check if the next block contains more than this one does if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) { return i + 1; } else { return i; } } } throw new IllegalArgumentException("The given offset is not contained in the any block."); } } On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <[hidden email]> wrote:
Flavio Pompermaier Phone: +(39) 0461 283 702 |
Congrats that you got your InputFormat working! It is true, there can be a few inconsistencies in the Formats derived from FileInputFormat. 2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Done through https://issues.apache.org/jira/browse/FLINK-2503
Thanks again, Flavio On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <[hidden email]> wrote:
|
Thanks a lot! 2015-08-10 12:20 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Free forum by Nabble | Edit this page |