org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Gregory Fee
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamCalcRule$37.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 25 more

Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)

at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)

... 37 more


--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:rgb(73,79,80);font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft
Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Philip Doctor


I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.


From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamCalcRule$37.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 25 more

Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)

at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)

... 37 more


--
Gregory Fee
Engineer
<a href="tel:&#43;14258304734" target="_blank" style="font-size:13px; color:rgb(73,79,80); font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif; text-decoration:none">425.830.4734
Lyft
Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Philip Doctor

Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.





From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 


I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.


From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamCalcRule$37.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 25 more

Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)

at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)

... 37 more


--
Gregory Fee
Engineer
<a href="tel:&#43;14258304734" target="_blank" style="font-size:13px; color:rgb(73,79,80); font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif; text-decoration:none">425.830.4734
Lyft
Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

vino yang
Hi Gregory,

This exception seems a bug, you can create a issues in the JIRA. 

Thanks, vino.

2018-07-20 10:28 GMT+08:00 Philip Doctor <[hidden email]>:

Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.





From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 


I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.


From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamCalcRule$37.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 25 more

Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)

at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)

... 37 more


--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:rgb(73,79,80);font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft

Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Dawid Wysakowicz-2
Hi Gregory,
I think it is some flink bug. Could you file a JIRA for it? Also which version of flink are you using?
Best,
Dawid

On Fri, 20 Jul 2018 at 04:34, vino yang <[hidden email]> wrote:
Hi Gregory,

This exception seems a bug, you can create a issues in the JIRA. 

Thanks, vino.

2018-07-20 10:28 GMT+08:00 Philip Doctor <[hidden email]>:

Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.





From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 


I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.


From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamCalcRule$37.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 25 more

Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)

at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)

... 37 more


--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:rgb(73,79,80);font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft

Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Gregory Fee
This is on Flink 1.4.2. I filed it as Flink-9905. Thanks!

On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Gregory,
I think it is some flink bug. Could you file a JIRA for it? Also which version of flink are you using?
Best,
Dawid

On Fri, 20 Jul 2018 at 04:34, vino yang <[hidden email]> wrote:
Hi Gregory,

This exception seems a bug, you can create a issues in the JIRA. 

Thanks, vino.

2018-07-20 10:28 GMT+08:00 Philip Doctor <[hidden email]>:

Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.





From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 


I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.


From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamCalcRule$37.processElement(Unknown Source)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)

at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)

at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 25 more

Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)

at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)

at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)

at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)

... 37 more


--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:rgb(73,79,80);font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft




--
<form method="post" target="_blank" onsubmit="try {return window.confirm(&quot;You are submitting information to an external page.\nAre you sure?&quot;);} catch (e) {return false;}">
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:&#39;Helvetica Neue&#39;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft
Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Aljoscha Krettek
Hi Greg,

just making sure but is there any asynchrony in your user functions? Any Async I/O operator maybe?

Best,
Aljoscha

On 20. Jul 2018, at 21:53, Gregory Fee <[hidden email]> wrote:

This is on Flink 1.4.2. I filed it as Flink-9905. Thanks!

On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Gregory,
I think it is some flink bug. Could you file a JIRA for it? Also which version of flink are you using?
Best,
Dawid

On Fri, 20 Jul 2018 at 04:34, vino yang <[hidden email]> wrote:
Hi Gregory,

This exception seems a bug, you can create a issues in the JIRA. 

Thanks, vino.

2018-07-20 10:28 GMT+08:00 Philip Doctor <[hidden email]>:
Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.




From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 

I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.

From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)
at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 5 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamSourceConversion$14.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 14 more
Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamCalcRule$37.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 25 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)
... 37 more

--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:rgb(73,79,80);font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank" class="">425.830.4734
Lyft




--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:'Helvetica Neue',Helvetica,Arial,sans-serif;text-decoration:none" target="_blank" class="">425.830.4734
Lyft

Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Gregory Fee
Hi Aljoscha! I am not using any async i/o. I do use a trick similar to ContinuousFileReaderOperator where I use another thread to write to the output asynchronously though.

On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi Greg,

just making sure but is there any asynchrony in your user functions? Any Async I/O operator maybe?

Best,
Aljoscha

On 20. Jul 2018, at 21:53, Gregory Fee <[hidden email]> wrote:

This is on Flink 1.4.2. I filed it as Flink-9905. Thanks!

On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Gregory,
I think it is some flink bug. Could you file a JIRA for it? Also which version of flink are you using?
Best,
Dawid

On Fri, 20 Jul 2018 at 04:34, vino yang <[hidden email]> wrote:
Hi Gregory,

This exception seems a bug, you can create a issues in the JIRA. 

Thanks, vino.

2018-07-20 10:28 GMT+08:00 Philip Doctor <[hidden email]>:
Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.




From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 

I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.

From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)
at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 5 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamSourceConversion$14.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 14 more
Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamCalcRule$37.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 25 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)
... 37 more

--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:rgb(73,79,80);font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft




--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:&#39;Helvetica Neue&#39;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft




--
<form method="post" target="_blank" onsubmit="try {return window.confirm(&quot;You are submitting information to an external page.\nAre you sure?&quot;);} catch (e) {return false;}">
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:&#39;Helvetica Neue&#39;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft
Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Aljoscha Krettek
Ha! Would you be able to share the code for that? If you don't acquire the "checkpoint lock" before writing this would explain the exception.

On 23. Jul 2018, at 17:37, Gregory Fee <[hidden email]> wrote:

Hi Aljoscha! I am not using any async i/o. I do use a trick similar to ContinuousFileReaderOperator where I use another thread to write to the output asynchronously though.

On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi Greg,

just making sure but is there any asynchrony in your user functions? Any Async I/O operator maybe?

Best,
Aljoscha

On 20. Jul 2018, at 21:53, Gregory Fee <[hidden email]> wrote:

This is on Flink 1.4.2. I filed it as Flink-9905. Thanks!

On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Gregory,
I think it is some flink bug. Could you file a JIRA for it? Also which version of flink are you using?
Best,
Dawid

On Fri, 20 Jul 2018 at 04:34, vino yang <[hidden email]> wrote:
Hi Gregory,

This exception seems a bug, you can create a issues in the JIRA. 

Thanks, vino.

2018-07-20 10:28 GMT+08:00 Philip Doctor <[hidden email]>:
Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.




From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 

I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.

From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)
at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 5 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamSourceConversion$14.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 14 more
Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamCalcRule$37.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 25 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)
... 37 more

--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:rgb(73,79,80);font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank" class="">425.830.4734
Lyft




--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:'Helvetica Neue',Helvetica,Arial,sans-serif;text-decoration:none" target="_blank" class="">425.830.4734
Lyft




--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:'Helvetica Neue',Helvetica,Arial,sans-serif;text-decoration:none" target="_blank" class="">425.830.4734
Lyft

Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Gregory Fee
Ah yes! This started happening after I moved the code to do the write outside the lock. The reason I did that is because I've run into some situations where checkpoints seem to stall indefinitely without progress. My suspicion was that there is a deadlock condition but putting more thought into it I haven't been able to come up with exactly how that would occur.

On Mon, Jul 23, 2018 at 8:43 AM, Aljoscha Krettek <[hidden email]> wrote:
Ha! Would you be able to share the code for that? If you don't acquire the "checkpoint lock" before writing this would explain the exception.

On 23. Jul 2018, at 17:37, Gregory Fee <[hidden email]> wrote:

Hi Aljoscha! I am not using any async i/o. I do use a trick similar to ContinuousFileReaderOperator where I use another thread to write to the output asynchronously though.

On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi Greg,

just making sure but is there any asynchrony in your user functions? Any Async I/O operator maybe?

Best,
Aljoscha

On 20. Jul 2018, at 21:53, Gregory Fee <[hidden email]> wrote:

This is on Flink 1.4.2. I filed it as Flink-9905. Thanks!

On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Gregory,
I think it is some flink bug. Could you file a JIRA for it? Also which version of flink are you using?
Best,
Dawid

On Fri, 20 Jul 2018 at 04:34, vino yang <[hidden email]> wrote:
Hi Gregory,

This exception seems a bug, you can create a issues in the JIRA. 

Thanks, vino.

2018-07-20 10:28 GMT+08:00 Philip Doctor <[hidden email]>:
Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic.




From: Philip Doctor <[hidden email]>
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 

I'm just a flink user, not an expert.  I've seen that exception before.  I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying.  It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause.  I'd read your logs a little further back.

From: Gregory Fee <[hidden email]>
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 
Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)
at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 5 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamSourceConversion$14.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 14 more
Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamCalcRule$37.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 25 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)
... 37 more

--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:rgb(73,79,80);font-family:&quot;Helvetica Neue&quot;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft




--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:&#39;Helvetica Neue&#39;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft




--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:&#39;Helvetica Neue&#39;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft




--
<form method="post" target="_blank" onsubmit="try {return window.confirm(&quot;You are submitting information to an external page.\nAre you sure?&quot;);} catch (e) {return false;}">
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:&#39;Helvetica Neue&#39;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft