Flink job getting killed

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink job getting killed

Giriraj
Hi,  

We are submitting a flink(1.9.1) job for data processing. It runs fine and processes data for sometime i.e. ~30 mins and later it throws following exception and job gets killed.
 2020-04-02 14:15:43,371 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Unnamed (2/4) (45d01514f0fb99602883ca43e997e8f3) switched from RUNNING to FAILED.
java.io.EOFException
        at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
        at org.apache.flink.types.StringValue.readString(StringValue.java:769)
        at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
        at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:91)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.base/java.lang.Thread.run(Unknown Source)


Once the above exception occur, we do see following runtime exception

java.lang.RuntimeException: Buffer pool is destroyed.
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.dell.emc.mars.topology.bl.DiffGenerator.handleCreate(DiffGenerator.java:519)
        at com.dell.emc.mars.topology.bl.DiffGenerator.populateHTable(DiffGenerator.java:294)
        at com.dell.emc.mars.topology.bl.DiffGenerator.compare(DiffGenerator.java:58)
        at com.dell.emc.mars.topology.bl.CompareWithDatabase.flatMap(CompareWithDatabase.java:146)
        at com.dell.emc.mars.topology.bl.CompareWithDatabase.flatMap(CompareWithDatabase.java:22)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.dell.emc.mars.topology.bl.CompareWithModel.flatMap(CompareWithModel.java:110)
        at com.dell.emc.mars.topology.bl.CompareWithModel.flatMap(CompareWithModel.java:24)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.dell.emc.mars.topology.bl.DiscoveryHandler.handleDisHoldJobs(DiscoveryHandler.java:180)
        at com.dell.emc.mars.topology.bl.DiscoveryHandler.lambda$handleEndMessage$0(DiscoveryHandler.java:138)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
        at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
        ... 29 more

We are trying to catch/handle the exception but somehow job is getting cancelled by manager.
Any opinion on what could be going wrong and how to get our arms around it?

Best Regards,
Giriraj
Reply | Threaded
Open this post in threaded view
|

Re: Flink job getting killed

Fabian Hueske-2
Hi Giriraj,

This looks like the deserialization of a String failed.
Can you isolate the problem to a pair of sending and receiving tasks?

Best, Fabian

Am So., 5. Apr. 2020 um 20:18 Uhr schrieb Giriraj Chauhan <[hidden email]>:
Hi,  

We are submitting a flink(1.9.1) job for data processing. It runs fine and processes data for sometime i.e. ~30 mins and later it throws following exception and job gets killed.
 2020-04-02 14:15:43,371 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Unnamed (2/4) (45d01514f0fb99602883ca43e997e8f3) switched from RUNNING to FAILED.
java.io.EOFException
        at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
        at org.apache.flink.types.StringValue.readString(StringValue.java:769)
        at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
        at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:91)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.base/java.lang.Thread.run(Unknown Source)


Once the above exception occur, we do see following runtime exception

java.lang.RuntimeException: Buffer pool is destroyed.
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.dell.emc.mars.topology.bl.DiffGenerator.handleCreate(DiffGenerator.java:519)
        at com.dell.emc.mars.topology.bl.DiffGenerator.populateHTable(DiffGenerator.java:294)
        at com.dell.emc.mars.topology.bl.DiffGenerator.compare(DiffGenerator.java:58)
        at com.dell.emc.mars.topology.bl.CompareWithDatabase.flatMap(CompareWithDatabase.java:146)
        at com.dell.emc.mars.topology.bl.CompareWithDatabase.flatMap(CompareWithDatabase.java:22)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.dell.emc.mars.topology.bl.CompareWithModel.flatMap(CompareWithModel.java:110)
        at com.dell.emc.mars.topology.bl.CompareWithModel.flatMap(CompareWithModel.java:24)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.dell.emc.mars.topology.bl.DiscoveryHandler.handleDisHoldJobs(DiscoveryHandler.java:180)
        at com.dell.emc.mars.topology.bl.DiscoveryHandler.lambda$handleEndMessage$0(DiscoveryHandler.java:138)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
        at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
        ... 29 more

We are trying to catch/handle the exception but somehow job is getting cancelled by manager.
Any opinion on what could be going wrong and how to get our arms around it?

Best Regards,
Giriraj