Hi Allan,Map does support null but tuple serializer does not. You might want to use pojo or row types if you need to deal with null values. Read more here.- AbhishekOn Sun, 29 Sep 2019 at 14:01, allan <[hidden email]> wrote:Hi guys,
When I use like the code,
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
if (properties != null) {
return new Tuple2<>(cv_no, json.toJSONString());
}
return null;
}
})next,
.filter(f->f!=null)
I submit my job , then the job throws an exception as follows.
java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
.
I found this method , record is null so the job threw an exception.why map can’t return null ? is this a bug?protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator (and Serializer) expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
if (outputTag != null) {
// Enrich error message
ClassCastException replace = new ClassCastException(
String.format(
"%s. Failed to push OutputTag with id '%s' to operator. " +
"This can occur when multiple OutputTags with different types " +
"but identical names are being used.",
e.getMessage(),
outputTag.getId()));
throw new ExceptionInChainedOperatorException(replace);
} else {
throw new ExceptionInChainedOperatorException(e);
}
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
}
Free forum by Nabble | Edit this page |