Help me understand this Exception

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Help me understand this Exception

anuj.aj07
Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {
@Override
public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp----", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}
Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
    ... 10 more

--
Thanks & Regards,
Anuj Jain



Reply | Threaded
Open this post in threaded view
|

Re: Help me understand this Exception

Tzu-Li (Gordon) Tai
Hi,

The exception stack you posted simply means that the next operator in the chain failed to process the output watermark.
There should be another exception, which would explain why some operator was closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon

On Tue, Mar 17, 2020 at 11:27 PM aj <[hidden email]> wrote:
Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {
@Override
public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp----", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}
Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
    ... 10 more

--
Thanks & Regards,
Anuj Jain



Reply | Threaded
Open this post in threaded view
|

Re: Help me understand this Exception

Zhijiang(wangzhijiang999)
Agree with Gordon's below explanation!

Besides that, maybe you can also check the job master's log which might probably show the specific exception to cause this failure.

I was thinking whether it is necessary to improve ExceptionInChainedOperatorException to also provide the message from the wrapped real exception,
then users can easily get the root cause directly, not only for the current message "Could not forward element to next operator".

Best,
Zhijiang

------------------------------------------------------------------
From:Tzu-Li (Gordon) Tai <[hidden email]>
Send Time:2020 Mar. 18 (Wed.) 00:01
Cc:user <[hidden email]>
Subject:Re: Help me understand this Exception

Hi,

The exception stack you posted simply means that the next operator in the chain failed to process the output watermark.
There should be another exception, which would explain why some operator was closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon

On Tue, Mar 17, 2020 at 11:27 PM aj <[hidden email]> wrote:
Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {
@Override
public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp----", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}
Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
    ... 10 more

--
Thanks & Regards,
Anuj Jain




Reply | Threaded
Open this post in threaded view
|

Re: Help me understand this Exception

anuj.aj07
Thanks, Zhijiang and Gordon.  

I will see the logs to find out more.

On Wed, Mar 18, 2020 at 1:44 PM Zhijiang <[hidden email]> wrote:
Agree with Gordon's below explanation!

Besides that, maybe you can also check the job master's log which might probably show the specific exception to cause this failure.

I was thinking whether it is necessary to improve ExceptionInChainedOperatorException to also provide the message from the wrapped real exception,
then users can easily get the root cause directly, not only for the current message "Could not forward element to next operator".

Best,
Zhijiang

------------------------------------------------------------------
From:Tzu-Li (Gordon) Tai <[hidden email]>
Send Time:2020 Mar. 18 (Wed.) 00:01
Cc:user <[hidden email]>
Subject:Re: Help me understand this Exception

Hi,

The exception stack you posted simply means that the next operator in the chain failed to process the output watermark.
There should be another exception, which would explain why some operator was closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon

On Tue, Mar 17, 2020 at 11:27 PM aj <[hidden email]> wrote:
Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {
@Override
public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp----", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}
Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
    ... 10 more

--
Thanks & Regards,
Anuj Jain






--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07