Exceptions from collector.collect after cancelling job

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Exceptions from collector.collect after cancelling job

Shannon Carey
When I cancel a job, I get many exceptions that look like this:

java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
... lots of Flink and user code (a flat map function) stack entries here
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
... 43 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
... 48 more

It looks like Flink is disabling the objects that the FlatMap collector relies on before disabling the operator itself. Is that expected/normal? Is there anything I should change in my FlatMap function or job code to account for it?

-Shannon
Reply | Threaded
Open this post in threaded view
|

Re: Exceptions from collector.collect after cancelling job

Ufuk Celebi
On Thu, Sep 29, 2016 at 9:29 PM, Shannon Carey <[hidden email]> wrote:
> It looks like Flink is disabling the objects that the FlatMap collector
> relies on before disabling the operator itself. Is that expected/normal? Is
> there anything I should change in my FlatMap function or job code to account
> for it?

Hey Shannon,

Flink actually does cancel the tasks *before* cleaning up the network
resources that throw the root Exception here.

We actually don't log any Exceptions that are thrown during
cancellation, because it is possible that the user code/operator use
the closed resources concurrently with cancellation (your stack traces
essentially), but it looks like in some places we don't respect this.

Can you tell which classes actually log this? Would be good to fix
this if possible as it is very confusing and looks quite bad. I don't
expect it to be an actual problem though.

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Exceptions from collector.collect after cancelling job

Shannon Carey
My flat map function is catching & logging the exception. The try block happens to encompass the call to Collector#collect().

I will move the call to collect outside of the try. That should silence the log message.




On 9/30/16, 3:51 AM, "Ufuk Celebi" <[hidden email]> wrote:

>On Thu, Sep 29, 2016 at 9:29 PM, Shannon Carey <[hidden email]> wrote:
>> It looks like Flink is disabling the objects that the FlatMap collector
>> relies on before disabling the operator itself. Is that expected/normal? Is
>> there anything I should change in my FlatMap function or job code to account
>> for it?
>
>Hey Shannon,
>
>Flink actually does cancel the tasks *before* cleaning up the network
>resources that throw the root Exception here.
>
>We actually don't log any Exceptions that are thrown during
>cancellation, because it is possible that the user code/operator use
>the closed resources concurrently with cancellation (your stack traces
>essentially), but it looks like in some places we don't respect this.
>
>Can you tell which classes actually log this? Would be good to fix
>this if possible as it is very confusing and looks quite bad. I don't
>expect it to be an actual problem though.
>
>– Ufuk
>