In the Flink dashboard, my job is failing with a NullPointerException but the Exception is not showing a stack trace. I do not see any NullPointerExceptions in any of the flink-jobmanager and flink-taskmanager logs.
Is this a normal issue? |
One of the Exception instances finally reported a stacktrace. I'm not sure why it's so infrequent. java.lang.NullPointerException: null at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:338) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.lambda$createRowConverter$6827278$1(AvroRowDataSerializationSchema.java:177) ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?] at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:251) ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?] at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:247) ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?] at org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:498) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:494) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:202) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at StreamExecCalc$793.processElement(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.interval.EmitAwareCollector.collect(EmitAwareCollector.java:48) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.interval.EmitAwareCollector.collect(EmitAwareCollector.java:28) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.interval.TimeIntervalJoin.lambda$removeExpiredRows$2(TimeIntervalJoin.java:411) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_265] at org.apache.flink.table.runtime.operators.join.interval.TimeIntervalJoin.removeExpiredRows(TimeIntervalJoin.java:408) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.interval.TimeIntervalJoin.onTimer(TimeIntervalJoin.java:332) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin.onTimer(RowTimeIntervalJoin.java:30) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) ~[flink-dist_2.12-1.11.1.jar:1.11.1] On Wed, Dec 9, 2020 at 4:33 PM Dan Hill <[hidden email]> wrote:
|
It looks like the problem is that there's a problem in reading a null value in the AvroRowDataDeserializationSchema (see below for the snippet of code from Flink 1.11.1). The problem is due to the fact that there's a bad typing of the source so the call to createConverter() within the createNullableConverter() returns null, creating a null on fieldConverters[i] and, in the end, a NullPointer in fieldConverters[i].convert(). Does it make sense? static DeserializationRuntimeConverter createRowConverter(RowType rowType) { final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream() .map(RowType.RowField::getType) .map(AvroRowDataDeserializationSchema::createNullableConverter) .toArray(DeserializationRuntimeConverter[]::new); final int arity = rowType.getFieldCount(); return avroObject -> { IndexedRecord record = (IndexedRecord) avroObject; GenericRowData row = new GenericRowData(arity); for (int i = 0; i < arity; ++i) { row.setField(i, fieldConverters[i].convert(record.get(i))); } return row; }; } Best, Flavio On Thu, Dec 10, 2020 at 8:39 AM Dan Hill <[hidden email]> wrote:
|
Yea, the error makes sense and was an easy fix. Any idea what happened with the hidden stacktrace? The hidden stacktrace made this 100x more difficult. On Thu, Dec 10, 2020 at 12:59 AM Flavio Pompermaier <[hidden email]> wrote:
|
Hi Dan, Do you have an example job and some sample data to reproduce this problem? I couldn't reproduce it locally with a simple example job. Cheers, Till On Thu, Dec 10, 2020 at 5:51 PM Dan Hill <[hidden email]> wrote:
|
Hmm, I don't have a good job I can separate for reproduction. I was using Table SQL and inserting a long field (which was null) into a table that sinked out to avro. The exception was being thrown from this Avro function. I can watch to see if it keeps happening. On Fri, Dec 11, 2020 at 3:31 AM Till Rohrmann <[hidden email]> wrote:
|
Ok, then let's see whether it reoccurs. What you could do is to revert the fix and check the stack trace again. Cheers, Till On Sat, Dec 12, 2020, 02:16 Dan Hill <[hidden email]> wrote:
|
This is a performance optimization in JVM when the same exception is thrown too frequently. You can set `-XX:-OmitStackTraceInFastThrow` to disable the feature. You can typically find the full stack trace in the log before the optimization kicks in. On Sat, Dec 12, 2020 at 2:05 AM Till Rohrmann <[hidden email]> wrote:
|
Thanks! That makes sense. On Sat, Dec 12, 2020 at 11:13 AM Steven Wu <[hidden email]> wrote:
|
Thanks a lot for this information Steven. I learned again something :-) Cheers, Till On Sat, Dec 12, 2020 at 9:02 PM Dan Hill <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |