Hi all,
I am reading a large number of GZip compressed csv files, nested in a HDFS directory: Configuration parameters = new Configuration(); parameters.setBoolean("recursive.file.enumeration", true); DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/") .ignoreFirstLine() .fieldDelimiter("|") .includeFields("011000") .types(String.class, Long.class) .withParameters(parameters); My job is failing with the following exception: 2016-10-04 17:19:59,933 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING. java.io.EOFException: Unexpected end of ZLIB input stream at java.util.zip.InflaterInputStream.fill(Unknown Source) at java.util.zip.InflaterInputStream.read(Unknown Source) at java.util.zip.GZIPInputStream.read(Unknown Source) at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) at java.lang.Thread.run(Unknown Source) I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.
Best, |
Hi Yassine, AFAIK, there is no built-in way to ignore corrupted compressed files.The wrapper would also catch and ignore the EOFException. 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
|
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions? On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
|
Thank you Fabian and Stephan for the suggestions. I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated. I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a java.lang.ArrayIndexOutOfBoundsException when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code: TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); Path histPath = new Path("hdfs:///shared/file.csv"); CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); myInputFormt.enableQuotedStringParsing('"'); myInputFormt.setSkipFirstLineAsHeader(true); myInputFormt.setLenient(true); DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters); test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); and here is the custom input format: public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> { private static final long serialVersionUID = 1L; private TupleSerializerBase<OUT> tupleSerializer; public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity())); } public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) { super(filePath); boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(tupleTypeInfo.getArity()) : toBooleanMask(includedFieldsMask); configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); } public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { super(filePath); configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask); } private void configure(String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { if (tupleTypeInfo.getArity() == 0) { throw new IllegalArgumentException("Tuple size must be greater than 0."); } if (includedFieldsMask == null) { includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity()); } tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig()); setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; for (int i = 0; i < tupleTypeInfo.getArity(); i++) { classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); } setFieldsGeneric(includedFieldsMask, classes); } @Override public OUT fillRecord(OUT reuse, Object[] parsedValues) { return tupleSerializer.createOrReuseInstance(parsedValues, reuse); } @Override public OUT nextRecord(OUT record) { OUT returnRecord = null; do { try { returnRecord = super.nextRecord(record); } catch (IOException e) { e.printStackTrace(); } } while (returnRecord == null && !reachedEnd()); return returnRecord; } } Thanks, Yassine 2016-10-04 18:35 GMT+02:00 Stephan Ewen <[hidden email]>:
|
Hi Yassine, I ran your code without problems and got the correct result.2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
|
Hi Fabian, I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace: java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49) at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28) at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98) at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395) at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470) at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) And here is a sample CSV: timestamp,url,id 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000 Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 0000000) DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
2016-10-11 11:21 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same exception with the second code too. 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
|
Hi, Flink's String parser does not support escaped quotes. You data contains a double double quote (""). The parser identifies this as the end of the string field.2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
|
I posted a workaround for that at https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java On 11 Oct 2016 1:57 p.m., "Fabian Hueske" <[hidden email]> wrote:
|
Thank you Fabian and Flavio for your help. Best, Yassine 2016-10-11 14:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Free forum by Nabble | Edit this page |