Re: Help me understand this Exception

Posted by Zhijiang(wangzhijiang999) on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Help-me-understand-this-Exception-tp33733p33756.html

Agree with Gordon's below explanation!

Besides that, maybe you can also check the job master's log which might probably show the specific exception to cause this failure.

I was thinking whether it is necessary to improve ExceptionInChainedOperatorException to also provide the message from the wrapped real exception,
then users can easily get the root cause directly, not only for the current message "Could not forward element to next operator".

Best,
Zhijiang

------------------------------------------------------------------
From:Tzu-Li (Gordon) Tai <[hidden email]>
Send Time:2020 Mar. 18 (Wed.) 00:01
Cc:user <[hidden email]>
Subject:Re: Help me understand this Exception

Hi,

The exception stack you posted simply means that the next operator in the chain failed to process the output watermark.
There should be another exception, which would explain why some operator was closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon

On Tue, Mar 17, 2020 at 11:27 PM aj <[hidden email]> wrote:
Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {
@Override
public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp----", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}
Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
    ... 10 more

--
Thanks & Regards,
Anuj Jain