Un-ignored Parsing Exceptions in the CsvFormat

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Un-ignored Parsing Exceptions in the CsvFormat

austin.ce
Hey all,

I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].

Even with the `ignoreParseErrors()` set, the job fails when it encounters some types of malformed rows. The root cause is indeed a `ParseException`, so I'm wondering if there's anything more I need to do to ignore these rows. Each field in the schema is a STRING.


I've configured the CSV format and table like so:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()

Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check to `isLenient()` if there is an unexpected parser position?[2]

Example error:

2020-10-16 12:50:18
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: org.apache.flink.api.common.io.ParseException: Unexpected parser position for column 1 of row '",https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,"",,,,company,'
at org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)


Thanks,
Austin

Reply | Threaded
Open this post in threaded view
|

Re: Un-ignored Parsing Exceptions in the CsvFormat

r_khachatryan
Hey Austin,

I think you are right. The problematic row contains an odd number of delimiters in which case skipFields will return -1, which in turn leads to an exception.

I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711 to fix it.

Regards,
Roman


On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].

Even with the `ignoreParseErrors()` set, the job fails when it encounters some types of malformed rows. The root cause is indeed a `ParseException`, so I'm wondering if there's anything more I need to do to ignore these rows. Each field in the schema is a STRING.


I've configured the CSV format and table like so:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()

Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check to `isLenient()` if there is an unexpected parser position?[2]

Example error:

2020-10-16 12:50:18
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: org.apache.flink.api.common.io.ParseException: Unexpected parser position for column 1 of row '",https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,"",,,,company,'
at org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)


Thanks,
Austin

Reply | Threaded
Open this post in threaded view
|

Re: Un-ignored Parsing Exceptions in the CsvFormat

austin.ce
Hey Roman,

Sorry to miss this -- thanks for the confirmation and making the ticket. I'm happy to propose a fix if someone is able to assign the ticket to me.

Best,
Austin

On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <[hidden email]> wrote:
Hey Austin,

I think you are right. The problematic row contains an odd number of delimiters in which case skipFields will return -1, which in turn leads to an exception.

I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711 to fix it.

Regards,
Roman


On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].

Even with the `ignoreParseErrors()` set, the job fails when it encounters some types of malformed rows. The root cause is indeed a `ParseException`, so I'm wondering if there's anything more I need to do to ignore these rows. Each field in the schema is a STRING.


I've configured the CSV format and table like so:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()

Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check to `isLenient()` if there is an unexpected parser position?[2]

Example error:

2020-10-16 12:50:18
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: org.apache.flink.api.common.io.ParseException: Unexpected parser position for column 1 of row '",https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,"",,,,company,'
at org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)


Thanks,
Austin

Reply | Threaded
Open this post in threaded view
|

Re: Un-ignored Parsing Exceptions in the CsvFormat

r_khachatryan
Hey Austin,

I assigned the ticket, 
that would be great if you could fix it!

Regards,
Roman


On Thu, Oct 22, 2020 at 5:08 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Roman,

Sorry to miss this -- thanks for the confirmation and making the ticket. I'm happy to propose a fix if someone is able to assign the ticket to me.

Best,
Austin

On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <[hidden email]> wrote:
Hey Austin,

I think you are right. The problematic row contains an odd number of delimiters in which case skipFields will return -1, which in turn leads to an exception.

I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711 to fix it.

Regards,
Roman


On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].

Even with the `ignoreParseErrors()` set, the job fails when it encounters some types of malformed rows. The root cause is indeed a `ParseException`, so I'm wondering if there's anything more I need to do to ignore these rows. Each field in the schema is a STRING.


I've configured the CSV format and table like so:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()

Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check to `isLenient()` if there is an unexpected parser position?[2]

Example error:

2020-10-16 12:50:18
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: org.apache.flink.api.common.io.ParseException: Unexpected parser position for column 1 of row '",https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,"",,,,company,'
at org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)


Thanks,
Austin