Re: map can't return null

Posted by Abhishek Jain on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/map-can-t-return-null-tp30269p30270.html

Hi Allan,
Map does support null but tuple serializer does not. You might want to use pojo or row types if you need to deal with null values. Read more here

- Abhishek

On Sun, 29 Sep 2019 at 14:01, allan <[hidden email]> wrote:

Hi guys

When I  use  like the code,

.map(new MapFunction<String, Tuple2<String, String>>() {
   
@Override
   
public Tuple2<String, String> map(String value) throws Exception {
       
if (properties != null) {
                   
                
return new Tuple2<>(cv_no, json.toJSONString());
           
        }

       
return null;

    }
})

next,

.filter(f->f!=null)

 

I submit my job ,  then the job throws an exception as follows.

java.lang.NullPointerException

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

       at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

       at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

       at java.lang.Thread.run(Thread.java:748)

.

 

I found this method , record is null so the job threw an exception.why map can’t return null ? is this a bug?
    protected <X> void pushToOperator(StreamRecord<X> record) {
     
try {
        
// we know that the given outputTag matches our OutputTag so the record
         // must be of the type that our operator (and Serializer) expects.
        
@SuppressWarnings("unchecked")
         StreamRecord<
T> castRecord = (StreamRecord<T>) record;

        
numRecordsIn.inc();
         StreamRecord<
T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
        
operator.setKeyContextElement1(copy);
        
operator.processElement(copy);
      }
catch (ClassCastException e) {
        
if (outputTag != null) {
           
// Enrich error message
           
ClassCastException replace = new ClassCastException(
               String.format(
                 
"%s. Failed to push OutputTag with id '%s' to operator. " +
                    
"This can occur when multiple OutputTags with different types " +
                    
"but identical names are being used.",
                  e.getMessage(),
                 
outputTag.getId()));

           
throw new ExceptionInChainedOperatorException(replace);
         }
else {
           
throw new ExceptionInChainedOperatorException(e);
         }
      }
catch (Exception e) {
        
throw new ExceptionInChainedOperatorException(e);
      }

   }
}