Exception occurred while processing valve output watermark & NullPointerException

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception occurred while processing valve output watermark & NullPointerException

Steve Bistline
Any guidance would be most appreciated.

Thx

Steve
===========================================

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
	... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
	at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
	... 14 more
Caused by: java.lang.NullPointerException
	at java.lang.String.contains(String.java:2133)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
	at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
	at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
	... 19 more


==================================================

The code

      // Consume the data streams from AWS Kinesis stream
DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"),
new EventSchema(),
kinesisConsumerConfig))
.name("Kinesis Stream Consumer");

//dataStream.print();

DataStream<Event> kinesisStream = dataStream
.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
.map(event -> (IoTEvent) event);

// Prints the mapped records from the Kinesis stream

//kinesisStream.print();


Pattern<Event, ?> pattern = Pattern
.<Event> begin("first event").subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>()
{
//private static final long serialVersionUID = -6301755149429716724L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("second")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
//private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("third")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("fourth")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.within(Time.seconds(10));


// Match the pattern in the input data stream
PatternStream<Event> patternStream = CEP.pattern(kinesisStream, pattern);

// Detects MOTION pattern match and alert
DataStream<Alert> alerts = patternStream.select(
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
Alert alert = new Alert(pattern);
System.out.printf("AUDIO ALERT\n");


return alert;
}

}).name("Audio Alert Sink");

Reply | Threaded
Open this post in threaded view
|

Re: Exception occurred while processing valve output watermark & NullPointerException

vino yang
Hi Steve,

It seems the NPE caused by the property of the IoTEvent's instance. Can you make sure the property is not null?

Thanks, vino.

Steve Bistline <[hidden email]> 于2018年11月21日周三 上午2:09写道:
Any guidance would be most appreciated.

Thx

Steve
===========================================

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
	... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
	at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
	... 14 more
Caused by: java.lang.NullPointerException
	at java.lang.String.contains(String.java:2133)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
	at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
	at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
	... 19 more


==================================================

The code

      // Consume the data streams from AWS Kinesis stream
DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"),
new EventSchema(),
kinesisConsumerConfig))
.name("Kinesis Stream Consumer");

//dataStream.print();

DataStream<Event> kinesisStream = dataStream
.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
.map(event -> (IoTEvent) event);

// Prints the mapped records from the Kinesis stream

//kinesisStream.print();


Pattern<Event, ?> pattern = Pattern
.<Event> begin("first event").subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>()
{
//private static final long serialVersionUID = -6301755149429716724L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("second")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
//private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("third")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("fourth")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.within(Time.seconds(10));


// Match the pattern in the input data stream
PatternStream<Event> patternStream = CEP.pattern(kinesisStream, pattern);

// Detects MOTION pattern match and alert
DataStream<Alert> alerts = patternStream.select(
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
Alert alert = new Alert(pattern);
System.out.printf("AUDIO ALERT\n");


return alert;
}

}).name("Audio Alert Sink");

Reply | Threaded
Open this post in threaded view
|

Re: Exception occurred while processing valve output watermark & NullPointerException

Dawid Wysakowicz-2

Hi,

I think vino is right. It seems that the NullPointerException comes from your condition. Please add handling of the situation when the string that you are comparing is null.

Best,

Dawid


On 21/11/2018 04:32, vino yang wrote:
Hi Steve,

It seems the NPE caused by the property of the IoTEvent's instance. Can you make sure the property is not null?

Thanks, vino.

Steve Bistline <[hidden email]> 于2018年11月21日周三 上午2:09写道:
Any guidance would be most appreciated.

Thx

Steve
===========================================

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
	... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
	at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
	... 14 more
Caused by: java.lang.NullPointerException
	at java.lang.String.contains(String.java:2133)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
	at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
	at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
	... 19 more

                

                
==================================================

                
The code

                
      // Consume the data streams from AWS Kinesis stream
        DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
                pt.getRequired("stream"),
                new EventSchema(),
                kinesisConsumerConfig))
                .name("Kinesis Stream Consumer");

       //dataStream.print();

        DataStream<Event> kinesisStream = dataStream
                .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
                .map(event -> (IoTEvent) event);

        // Prints the mapped records from the Kinesis stream

        //kinesisStream.print();


        Pattern<Event, ?> pattern = Pattern
                .<Event> begin("first event").subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>()
                {
                    //private static final long serialVersionUID = -6301755149429716724L;

                    @Override
                    public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
                        return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
                    }
                })
                .next("second")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    //private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
                        return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
                    }
                })
                .next("third")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
                        return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
                   }
                })
                .next("fourth")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
                        return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
                    }
                })
                .within(Time.seconds(10));


        // Match the pattern in the input data stream
        PatternStream<Event> patternStream = CEP.pattern(kinesisStream, pattern);

        // Detects MOTION pattern match and alert
        DataStream<Alert> alerts = patternStream.select(
                new PatternSelectFunction<Event, Alert>() {
                    @Override
                    public Alert select(Map<String, List<Event>> pattern) throws Exception {
                        Alert alert = new Alert(pattern);
                        System.out.printf("AUDIO ALERT\n");


                        return alert;
                    }

        }).name("Audio Alert Sink");


signature.asc (849 bytes) Download Attachment