map can't return null

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

map can't return null

allan

Hi guys

When I  use  like the code,

.map(new MapFunction<String, Tuple2<String, String>>() {
   
@Override
   
public Tuple2<String, String> map(String value) throws Exception {
       
if (properties != null) {
                   
                
return new Tuple2<>(cv_no, json.toJSONString());
           
        }

       
return null;

    }
})

next,

.filter(f->f!=null)

 

I submit my job ,  then the job throws an exception as follows.

java.lang.NullPointerException

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

       at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

       at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

       at java.lang.Thread.run(Thread.java:748)

.

 

I found this method , record is null so the job threw an exception.why map can’t return null ? is this a bug?
    protected <X> void pushToOperator(StreamRecord<X> record) {
     
try {
        
// we know that the given outputTag matches our OutputTag so the record
         // must be of the type that our operator (and Serializer) expects.
        
@SuppressWarnings("unchecked")
         StreamRecord<
T> castRecord = (StreamRecord<T>) record;

        
numRecordsIn.inc();
         StreamRecord<
T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
        
operator.setKeyContextElement1(copy);
        
operator.processElement(copy);
      }
catch (ClassCastException e) {
        
if (outputTag != null) {
           
// Enrich error message
           
ClassCastException replace = new ClassCastException(
               String.format(
                 
"%s. Failed to push OutputTag with id '%s' to operator. " +
                    
"This can occur when multiple OutputTags with different types " +
                    
"but identical names are being used.",
                  e.getMessage(),
                 
outputTag.getId()));

           
throw new ExceptionInChainedOperatorException(replace);
         }
else {
           
throw new ExceptionInChainedOperatorException(e);
         }
      }
catch (Exception e) {
        
throw new ExceptionInChainedOperatorException(e);
      }

   }
}
 
 

 

Reply | Threaded
Open this post in threaded view
|

Re: map can't return null

Abhishek Jain
Hi Allan,
Map does support null but tuple serializer does not. You might want to use pojo or row types if you need to deal with null values. Read more here

- Abhishek

On Sun, 29 Sep 2019 at 14:01, allan <[hidden email]> wrote:

Hi guys

When I  use  like the code,

.map(new MapFunction<String, Tuple2<String, String>>() {
   
@Override
   
public Tuple2<String, String> map(String value) throws Exception {
       
if (properties != null) {
                   
                
return new Tuple2<>(cv_no, json.toJSONString());
           
        }

       
return null;

    }
})

next,

.filter(f->f!=null)

 

I submit my job ,  then the job throws an exception as follows.

java.lang.NullPointerException

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

       at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

       at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

       at java.lang.Thread.run(Thread.java:748)

.

 

I found this method , record is null so the job threw an exception.why map can’t return null ? is this a bug?
    protected <X> void pushToOperator(StreamRecord<X> record) {
     
try {
        
// we know that the given outputTag matches our OutputTag so the record
         // must be of the type that our operator (and Serializer) expects.
        
@SuppressWarnings("unchecked")
         StreamRecord<
T> castRecord = (StreamRecord<T>) record;

        
numRecordsIn.inc();
         StreamRecord<
T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
        
operator.setKeyContextElement1(copy);
        
operator.processElement(copy);
      }
catch (ClassCastException e) {
        
if (outputTag != null) {
           
// Enrich error message
           
ClassCastException replace = new ClassCastException(
               String.format(
                 
"%s. Failed to push OutputTag with id '%s' to operator. " +
                    
"This can occur when multiple OutputTags with different types " +
                    
"but identical names are being used.",
                  e.getMessage(),
                 
outputTag.getId()));

           
throw new ExceptionInChainedOperatorException(replace);
         }
else {
           
throw new ExceptionInChainedOperatorException(e);
         }
      }
catch (Exception e) {
        
throw new ExceptionInChainedOperatorException(e);
      }

   }
}
 
 

 


Reply | Threaded
Open this post in threaded view
|

Re: map can't return null

Biao Liu
Hi allan,

It's not a bug. Flink does not support null value, see discussion [1].

In you example, you have to return something with MapFunction even there is nothing to return. Maybe you could use flatmap instead of map to handle this null value scenario. It's allowed to collect nothing (skip collecting when there is no data to return) with FlatMapFunction. Does it satisfy your requirement?


On Sun, 29 Sep 2019 at 16:48, Abhishek Jain <[hidden email]> wrote:
Hi Allan,
Map does support null but tuple serializer does not. You might want to use pojo or row types if you need to deal with null values. Read more here

- Abhishek

On Sun, 29 Sep 2019 at 14:01, allan <[hidden email]> wrote:

Hi guys

When I  use  like the code,

.map(new MapFunction<String, Tuple2<String, String>>() {
   
@Override
   
public Tuple2<String, String> map(String value) throws Exception {
       
if (properties != null) {
                   
                
return new Tuple2<>(cv_no, json.toJSONString());
           
        }

       
return null;

    }
})

next,

.filter(f->f!=null)

 

I submit my job ,  then the job throws an exception as follows.

java.lang.NullPointerException

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

       at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

       at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

       at java.lang.Thread.run(Thread.java:748)

.

 

I found this method , record is null so the job threw an exception.why map can’t return null ? is this a bug?
    protected <X> void pushToOperator(StreamRecord<X> record) {
     
try {
        
// we know that the given outputTag matches our OutputTag so the record
         // must be of the type that our operator (and Serializer) expects.
        
@SuppressWarnings("unchecked")
         StreamRecord<
T> castRecord = (StreamRecord<T>) record;

        
numRecordsIn.inc();
         StreamRecord<
T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
        
operator.setKeyContextElement1(copy);
        
operator.processElement(copy);
      }
catch (ClassCastException e) {
        
if (outputTag != null) {
           
// Enrich error message
           
ClassCastException replace = new ClassCastException(
               String.format(
                 
"%s. Failed to push OutputTag with id '%s' to operator. " +
                    
"This can occur when multiple OutputTags with different types " +
                    
"but identical names are being used.",
                  e.getMessage(),
                 
outputTag.getId()));

           
throw new ExceptionInChainedOperatorException(replace);
         }
else {
           
throw new ExceptionInChainedOperatorException(e);
         }
      }
catch (Exception e) {
        
throw new ExceptionInChainedOperatorException(e);
      }

   }
}
 
 

 


Reply | Threaded
Open this post in threaded view
|

Re:Re: map can't return null

allan

hi,
   ok,thanks.I'll read it. Then I have another problem, which was that I had caught the exception ,but it still came out.




At 2019-09-29 17:05:20, "Biao Liu" <[hidden email]> wrote:
Hi allan,

It's not a bug. Flink does not support null value, see discussion [1].

In you example, you have to return something with MapFunction even there is nothing to return. Maybe you could use flatmap instead of map to handle this null value scenario. It's allowed to collect nothing (skip collecting when there is no data to return) with FlatMapFunction. Does it satisfy your requirement?


On Sun, 29 Sep 2019 at 16:48, Abhishek Jain <[hidden email]> wrote:
Hi Allan,
Map does support null but tuple serializer does not. You might want to use pojo or row types if you need to deal with null values. Read more here

- Abhishek

On Sun, 29 Sep 2019 at 14:01, allan <[hidden email]> wrote:

Hi guys

When I  use  like the code,

.map(new MapFunction<String, Tuple2<String, String>>() {
   
@Override
   
public Tuple2<String, String> map(String value) throws Exception {
       
if (properties != null) {
                   
                
return new Tuple2<>(cv_no, json.toJSONString());
           
        }

       
return null;

    }
})

next,

.filter(f->f!=null)

 

I submit my job ,  then the job throws an exception as follows.

java.lang.NullPointerException

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

       at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

       at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

       at java.lang.Thread.run(Thread.java:748)

.

 

I found this method , record is null so the job threw an exception.why map can’t return null ? is this a bug?
    protected <X> void pushToOperator(StreamRecord<X> record) {
     
try {
        
// we know that the given outputTag matches our OutputTag so the record
         // must be of the type that our operator (and Serializer) expects.
        
@SuppressWarnings("unchecked")
         StreamRecord<
T> castRecord = (StreamRecord<T>) record;

        
numRecordsIn.inc();
         StreamRecord<
T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
        
operator.setKeyContextElement1(copy);
        
operator.processElement(copy);
      }
catch (ClassCastException e) {
        
if (outputTag != null) {
           
// Enrich error message
           
ClassCastException replace = new ClassCastException(
               String.format(
                 
"%s. Failed to push OutputTag with id '%s' to operator. " +
                    
"This can occur when multiple OutputTags with different types " +
                    
"but identical names are being used.",
                  e.getMessage(),
                 
outputTag.getId()));

           
throw new ExceptionInChainedOperatorException(replace);
         }
else {
           
throw new ExceptionInChainedOperatorException(e);
         }
      }
catch (Exception e) {
        
throw new ExceptionInChainedOperatorException(e);
      }

   }
}