map can't return null

Posted by allan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/map-can-t-return-null-tp30269.html

Hi guys

When I  use  like the code,

.map(new MapFunction<String, Tuple2<String, String>>() {
   
@Override
   
public Tuple2<String, String> map(String value) throws Exception {
       
if (properties != null) {
                   
                
return new Tuple2<>(cv_no, json.toJSONString());
           
        }

       
return null;

    }
})

next,

.filter(f->f!=null)

 

I submit my job ,  then the job throws an exception as follows.

java.lang.NullPointerException

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

       at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

       at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

       at java.lang.Thread.run(Thread.java:748)

.

 

I found this method , record is null so the job threw an exception.why map can’t return null ? is this a bug?
    protected <X> void pushToOperator(StreamRecord<X> record) {
     
try {
        
// we know that the given outputTag matches our OutputTag so the record
         // must be of the type that our operator (and Serializer) expects.
        
@SuppressWarnings("unchecked")
         StreamRecord<
T> castRecord = (StreamRecord<T>) record;

        
numRecordsIn.inc();
         StreamRecord<
T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
        
operator.setKeyContextElement1(copy);
        
operator.processElement(copy);
      }
catch (ClassCastException e) {
        
if (outputTag != null) {
           
// Enrich error message
           
ClassCastException replace = new ClassCastException(
               String.format(
                 
"%s. Failed to push OutputTag with id '%s' to operator. " +
                    
"This can occur when multiple OutputTags with different types " +
                    
"but identical names are being used.",
                  e.getMessage(),
                 
outputTag.getId()));

           
throw new ExceptionInChainedOperatorException(replace);
         }
else {
           
throw new ExceptionInChainedOperatorException(e);
         }
      }
catch (Exception e) {
        
throw new ExceptionInChainedOperatorException(e);
      }

   }
}