Handling decompression exceptions

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Handling decompression exceptions

Yassine MARZOUGUI
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Fabian Hueske-2
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Stephan Ewen
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine


Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Yassine MARZOUGUI
Thank you Fabian and Stephan for the suggestions.
I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated.
I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a java.lang.ArrayIndexOutOfBoundsException  when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code:

TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
Path histPath = new Path("hdfs:///shared/file.csv");

CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
myInputFormt.enableQuotedStringParsing('"');
myInputFormt.setSkipFirstLineAsHeader(true);
myInputFormt.setLenient(true);

DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters);
test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);

and here is the  custom input format:

public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
    private static final long serialVersionUID = 1L;
    private TupleSerializerBase<OUT> tupleSerializer;
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        super(filePath);
        boolean[] mask = (includedFieldsMask == null)
                ? createDefaultMask(tupleTypeInfo.getArity())
                : toBooleanMask(includedFieldsMask);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        super(filePath);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
    }
    private void configure(String lineDelimiter, String fieldDelimiter,
                           TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        if (tupleTypeInfo.getArity() == 0) {
            throw new IllegalArgumentException("Tuple size must be greater than 0.");
        }
        if (includedFieldsMask == null) {
            includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
        }
        tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
        setDelimiter(lineDelimiter);
        setFieldDelimiter(fieldDelimiter);
        Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
        for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
            classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
        }
        setFieldsGeneric(includedFieldsMask, classes);
    }
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
    }
   
    @Override
    public OUT nextRecord(OUT record) {
        OUT returnRecord = null;
        do {
            try {
                returnRecord = super.nextRecord(record);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } while (returnRecord == null && !reachedEnd());
        return returnRecord;
    }

}

Thanks,
Yassine





2016-10-04 18:35 GMT+02:00 Stephan Ewen <[hidden email]>:
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine



Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Fabian Hueske-2
Hi Yassine,

I ran your code without problems and got the correct result.
Can you provide the Stacktrace of the Exception?

Thanks, Fabian

2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Thank you Fabian and Stephan for the suggestions.
I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated.
I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a java.lang.ArrayIndexOutOfBoundsException  when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code:

TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
Path histPath = new Path("hdfs:///shared/file.csv");

CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
myInputFormt.enableQuotedStringParsing('"');
myInputFormt.setSkipFirstLineAsHeader(true);
myInputFormt.setLenient(true);

DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters);
test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);

and here is the  custom input format:

public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
    private static final long serialVersionUID = 1L;
    private TupleSerializerBase<OUT> tupleSerializer;
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        super(filePath);
        boolean[] mask = (includedFieldsMask == null)
                ? createDefaultMask(tupleTypeInfo.getArity())
                : toBooleanMask(includedFieldsMask);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        super(filePath);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
    }
    private void configure(String lineDelimiter, String fieldDelimiter,
                           TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        if (tupleTypeInfo.getArity() == 0) {
            throw new IllegalArgumentException("Tuple size must be greater than 0.");
        }
        if (includedFieldsMask == null) {
            includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
        }
        tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
        setDelimiter(lineDelimiter);
        setFieldDelimiter(fieldDelimiter);
        Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
        for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
            classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
        }
        setFieldsGeneric(includedFieldsMask, classes);
    }
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
    }
   
    @Override
    public OUT nextRecord(OUT record) {
        OUT returnRecord = null;
        do {
            try {
                returnRecord = super.nextRecord(record);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } while (returnRecord == null && !reachedEnd());
        return returnRecord;
    }

}

Thanks,
Yassine





2016-10-04 18:35 GMT+02:00 Stephan Ewen <[hidden email]>:
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine




Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Yassine MARZOUGUI
Hi Fabian,

I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace:

java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49)
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28)
at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98)
at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

And here is a sample CSV:

timestamp,url,id
2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000

Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 0000000)
DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.includeFields("101")
.ignoreInvalidLines()
.types(String.class, String.class);
withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is it a bug in Flink or is my data not compliant with the csv standards?

Thanks,
Yassine

2016-10-11 11:21 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Yassine,

I ran your code without problems and got the correct result.
Can you provide the Stacktrace of the Exception?

Thanks, Fabian

2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Thank you Fabian and Stephan for the suggestions.
I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated.
I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a java.lang.ArrayIndexOutOfBoundsException  when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code:

TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
Path histPath = new Path("hdfs:///shared/file.csv");

CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
myInputFormt.enableQuotedStringParsing('"');
myInputFormt.setSkipFirstLineAsHeader(true);
myInputFormt.setLenient(true);

DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters);
test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);

and here is the  custom input format:

public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
    private static final long serialVersionUID = 1L;
    private TupleSerializerBase<OUT> tupleSerializer;
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        super(filePath);
        boolean[] mask = (includedFieldsMask == null)
                ? createDefaultMask(tupleTypeInfo.getArity())
                : toBooleanMask(includedFieldsMask);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        super(filePath);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
    }
    private void configure(String lineDelimiter, String fieldDelimiter,
                           TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        if (tupleTypeInfo.getArity() == 0) {
            throw new IllegalArgumentException("Tuple size must be greater than 0.");
        }
        if (includedFieldsMask == null) {
            includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
        }
        tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
        setDelimiter(lineDelimiter);
        setFieldDelimiter(fieldDelimiter);
        Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
        for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
            classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
        }
        setFieldsGeneric(includedFieldsMask, classes);
    }
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
    }
   
    @Override
    public OUT nextRecord(OUT record) {
        OUT returnRecord = null;
        do {
            try {
                returnRecord = super.nextRecord(record);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } while (returnRecord == null && !reachedEnd());
        return returnRecord;
    }

}

Thanks,
Yassine





2016-10-04 18:35 GMT+02:00 Stephan Ewen <[hidden email]>:
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine





Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Yassine MARZOUGUI
Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same exception with the second code too.

2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi Fabian,

I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace:

java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49)
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28)
at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98)
at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

And here is a sample CSV:

timestamp,url,id
2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000

Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 0000000)
DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.includeFields("101")
.ignoreInvalidLines()
.types(String.class, String.class);
withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is it a bug in Flink or is my data not compliant with the csv standards?

Thanks,
Yassine

2016-10-11 11:21 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Yassine,

I ran your code without problems and got the correct result.
Can you provide the Stacktrace of the Exception?

Thanks, Fabian

2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Thank you Fabian and Stephan for the suggestions.
I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated.
I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a java.lang.ArrayIndexOutOfBoundsException  when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code:

TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
Path histPath = new Path("hdfs:///shared/file.csv");

CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
myInputFormt.enableQuotedStringParsing('"');
myInputFormt.setSkipFirstLineAsHeader(true);
myInputFormt.setLenient(true);

DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters);
test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);

and here is the  custom input format:

public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
    private static final long serialVersionUID = 1L;
    private TupleSerializerBase<OUT> tupleSerializer;
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        super(filePath);
        boolean[] mask = (includedFieldsMask == null)
                ? createDefaultMask(tupleTypeInfo.getArity())
                : toBooleanMask(includedFieldsMask);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        super(filePath);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
    }
    private void configure(String lineDelimiter, String fieldDelimiter,
                           TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        if (tupleTypeInfo.getArity() == 0) {
            throw new IllegalArgumentException("Tuple size must be greater than 0.");
        }
        if (includedFieldsMask == null) {
            includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
        }
        tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
        setDelimiter(lineDelimiter);
        setFieldDelimiter(fieldDelimiter);
        Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
        for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
            classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
        }
        setFieldsGeneric(includedFieldsMask, classes);
    }
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
    }
   
    @Override
    public OUT nextRecord(OUT record) {
        OUT returnRecord = null;
        do {
            try {
                returnRecord = super.nextRecord(record);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } while (returnRecord == null && !reachedEnd());
        return returnRecord;
    }

}

Thanks,
Yassine





2016-10-04 18:35 GMT+02:00 Stephan Ewen <[hidden email]>:
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine






Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Fabian Hueske-2
Hi,

Flink's String parser does not support escaped quotes. You data contains a double double quote (""). The parser identifies this as the end of the string field.
As a workaround, you can read the file as a regular text file, line by line and do the parsing in a MapFunction.

Best, Fabian

2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same exception with the second code too.

2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi Fabian,

I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace:

java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49)
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28)
at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98)
at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

And here is a sample CSV:

timestamp,url,id
2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000

Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 0000000)
DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.includeFields("101")
.ignoreInvalidLines()
.types(String.class, String.class);
withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is it a bug in Flink or is my data not compliant with the csv standards?

Thanks,
Yassine

2016-10-11 11:21 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Yassine,

I ran your code without problems and got the correct result.
Can you provide the Stacktrace of the Exception?

Thanks, Fabian

2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Thank you Fabian and Stephan for the suggestions.
I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated.
I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a java.lang.ArrayIndexOutOfBoundsException  when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code:

TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
Path histPath = new Path("hdfs:///shared/file.csv");

CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
myInputFormt.enableQuotedStringParsing('"');
myInputFormt.setSkipFirstLineAsHeader(true);
myInputFormt.setLenient(true);

DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters);
test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);

and here is the  custom input format:

public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
    private static final long serialVersionUID = 1L;
    private TupleSerializerBase<OUT> tupleSerializer;
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        super(filePath);
        boolean[] mask = (includedFieldsMask == null)
                ? createDefaultMask(tupleTypeInfo.getArity())
                : toBooleanMask(includedFieldsMask);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        super(filePath);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
    }
    private void configure(String lineDelimiter, String fieldDelimiter,
                           TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        if (tupleTypeInfo.getArity() == 0) {
            throw new IllegalArgumentException("Tuple size must be greater than 0.");
        }
        if (includedFieldsMask == null) {
            includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
        }
        tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
        setDelimiter(lineDelimiter);
        setFieldDelimiter(fieldDelimiter);
        Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
        for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
            classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
        }
        setFieldsGeneric(includedFieldsMask, classes);
    }
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
    }
   
    @Override
    public OUT nextRecord(OUT record) {
        OUT returnRecord = null;
        do {
            try {
                returnRecord = super.nextRecord(record);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } while (returnRecord == null && !reachedEnd());
        return returnRecord;
    }

}

Thanks,
Yassine





2016-10-04 18:35 GMT+02:00 Stephan Ewen <[hidden email]>:
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine







Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Flavio Pompermaier

I posted a workaround for that at https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java


On 11 Oct 2016 1:57 p.m., "Fabian Hueske" <[hidden email]> wrote:
Hi,

Flink's String parser does not support escaped quotes. You data contains a double double quote (""). The parser identifies this as the end of the string field.
As a workaround, you can read the file as a regular text file, line by line and do the parsing in a MapFunction.

Best, Fabian

2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same exception with the second code too.

2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi Fabian,

I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace:

java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49)
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28)
at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98)
at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

And here is a sample CSV:

timestamp,url,id
2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000

Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 0000000)
DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.includeFields("101")
.ignoreInvalidLines()
.types(String.class, String.class);
withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is it a bug in Flink or is my data not compliant with the csv standards?

Thanks,
Yassine

2016-10-11 11:21 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Yassine,

I ran your code without problems and got the correct result.
Can you provide the Stacktrace of the Exception?

Thanks, Fabian

2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Thank you Fabian and Stephan for the suggestions.
I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated.
I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a java.lang.ArrayIndexOutOfBoundsException  when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code:

TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
Path histPath = new Path("hdfs:///shared/file.csv");

CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
myInputFormt.enableQuotedStringParsing('"');
myInputFormt.setSkipFirstLineAsHeader(true);
myInputFormt.setLenient(true);

DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters);
test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);

and here is the  custom input format:

public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
    private static final long serialVersionUID = 1L;
    private TupleSerializerBase<OUT> tupleSerializer;
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        super(filePath);
        boolean[] mask = (includedFieldsMask == null)
                ? createDefaultMask(tupleTypeInfo.getArity())
                : toBooleanMask(includedFieldsMask);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        super(filePath);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
    }
    private void configure(String lineDelimiter, String fieldDelimiter,
                           TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        if (tupleTypeInfo.getArity() == 0) {
            throw new IllegalArgumentException("Tuple size must be greater than 0.");
        }
        if (includedFieldsMask == null) {
            includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
        }
        tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
        setDelimiter(lineDelimiter);
        setFieldDelimiter(fieldDelimiter);
        Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
        for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
            classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
        }
        setFieldsGeneric(includedFieldsMask, classes);
    }
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
    }
   
    @Override
    public OUT nextRecord(OUT record) {
        OUT returnRecord = null;
        do {
            try {
                returnRecord = super.nextRecord(record);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } while (returnRecord == null && !reachedEnd());
        return returnRecord;
    }

}

Thanks,
Yassine





2016-10-04 18:35 GMT+02:00 Stephan Ewen <[hidden email]>:
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine







Reply | Threaded
Open this post in threaded view
|

Re: Handling decompression exceptions

Yassine MARZOUGUI
Thank you Fabian and Flavio for your help.

Best,
Yassine

2016-10-11 14:02 GMT+02:00 Flavio Pompermaier <[hidden email]>:

I posted a workaround for that at https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java


On 11 Oct 2016 1:57 p.m., "Fabian Hueske" <[hidden email]> wrote:
Hi,

Flink's String parser does not support escaped quotes. You data contains a double double quote (""). The parser identifies this as the end of the string field.
As a workaround, you can read the file as a regular text file, line by line and do the parsing in a MapFunction.

Best, Fabian

2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same exception with the second code too.

2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi Fabian,

I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace:

java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49)
at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28)
at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98)
at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

And here is a sample CSV:

timestamp,url,id
2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000

Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 0000000)
DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.includeFields("101")
.ignoreInvalidLines()
.types(String.class, String.class);
withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Is it a bug in Flink or is my data not compliant with the csv standards?

Thanks,
Yassine

2016-10-11 11:21 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Yassine,

I ran your code without problems and got the correct result.
Can you provide the Stacktrace of the Exception?

Thanks, Fabian

2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Thank you Fabian and Stephan for the suggestions.
I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated.
I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a java.lang.ArrayIndexOutOfBoundsException  when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code:

TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
Path histPath = new Path("hdfs:///shared/file.csv");

CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
myInputFormt.enableQuotedStringParsing('"');
myInputFormt.setSkipFirstLineAsHeader(true);
myInputFormt.setLenient(true);

DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters);
test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);

and here is the  custom input format:

public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
    private static final long serialVersionUID = 1L;
    private TupleSerializerBase<OUT> tupleSerializer;
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
        super(filePath);
        boolean[] mask = (includedFieldsMask == null)
                ? createDefaultMask(tupleTypeInfo.getArity())
                : toBooleanMask(includedFieldsMask);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        super(filePath);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
    }
    private void configure(String lineDelimiter, String fieldDelimiter,
                           TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
        if (tupleTypeInfo.getArity() == 0) {
            throw new IllegalArgumentException("Tuple size must be greater than 0.");
        }
        if (includedFieldsMask == null) {
            includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
        }
        tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
        setDelimiter(lineDelimiter);
        setFieldDelimiter(fieldDelimiter);
        Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
        for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
            classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
        }
        setFieldsGeneric(includedFieldsMask, classes);
    }
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
    }
   
    @Override
    public OUT nextRecord(OUT record) {
        OUT returnRecord = null;
        do {
            try {
                returnRecord = super.nextRecord(record);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } while (returnRecord == null && !reachedEnd());
        return returnRecord;
    }

}

Thanks,
Yassine





2016-10-04 18:35 GMT+02:00 Stephan Ewen <[hidden email]>:
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine