Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks! org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 5 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamSourceConversion$14.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 14 more Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamCalcRule$37.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 25 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102) ... 37 more |
I'm just a flink user, not an expert. I've seen that exception before. I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying. It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause. I'd read your logs a little further back. From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM To: user Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use
an expert opinion. Thanks!
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 5 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamSourceConversion$14.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 14 more Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamCalcRule$37.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 25 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102) ... 37 more |
Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.
From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM To: Gregory Fee; user Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
I'm just a flink user, not an expert. I've seen that exception before. I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying. It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause. I'd read your logs a little further back. From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM To: user Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use
an expert opinion. Thanks!
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 5 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamSourceConversion$14.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 14 more Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamCalcRule$37.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 25 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102) ... 37 more |
Hi Gregory, This exception seems a bug, you can create a issues in the JIRA. Thanks, vino. 2018-07-20 10:28 GMT+08:00 Philip Doctor <[hidden email]>:
|
Hi Gregory, I think it is some flink bug. Could you file a JIRA for it? Also which version of flink are you using? Best, Dawid On Fri, 20 Jul 2018 at 04:34, vino yang <[hidden email]> wrote:
|
This is on Flink 1.4.2. I filed it as Flink-9905. Thanks! On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <[hidden email]> wrote:
<form method="post" target="_blank" onsubmit="try {return window.confirm("You are submitting information to an external page.\nAre you sure?");} catch (e) {return false;}"> Gregory Fee |
Hi Greg,
just making sure but is there any asynchrony in your user functions? Any Async I/O operator maybe? Best, Aljoscha
|
Hi Aljoscha! I am not using any async i/o. I do use a trick similar to ContinuousFileReaderOperator where I use another thread to write to the output asynchronously though. On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <[hidden email]> wrote:
<form method="post" target="_blank" onsubmit="try {return window.confirm("You are submitting information to an external page.\nAre you sure?");} catch (e) {return false;}"> Gregory Fee |
Ha! Would you be able to share the code for that? If you don't acquire the "checkpoint lock" before writing this would explain the exception.
|
Ah yes! This started happening after I moved the code to do the write outside the lock. The reason I did that is because I've run into some situations where checkpoints seem to stall indefinitely without progress. My suspicion was that there is a deadlock condition but putting more thought into it I haven't been able to come up with exactly how that would occur.
On Mon, Jul 23, 2018 at 8:43 AM, Aljoscha Krettek <[hidden email]> wrote:
<form method="post" target="_blank" onsubmit="try {return window.confirm("You are submitting information to an external page.\nAre you sure?");} catch (e) {return false;}"> Gregory Fee |
Free forum by Nabble | Edit this page |