Hi. What causes a buffer pool exception? How can I mitigate it? It is causing us plenty of problems right now. 2021-01-26 04:14:33,041 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 received completion notification for checkpoint with id=4. 2021-01-26 04:14:33,140 WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity. 2021-01-26 04:14:33,143 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch request (sessionId=936633685, epoch=1) to node 2: {}. org.apache.kafka.common.errors.DisconnectException: null 2021-01-26 04:14:33,146 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 checkpointing for checkpoint with id=5 (max part counter=1). THEN FINALLY ERROR ai.beyond.luminai.sensor.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction [] - Error in timer. java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at mypackage.MyOperator.collect(MyOperator.java:452) ~[develop-17e9fd0e.jar:?] at mypackage.MyOperator.onTimer(MyOperator.java:277) ~[develop-17e9fd0e.jar:?] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [develop-17e9fd0e.jar:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [develop-17e9fd0e.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:290) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120) ~[develop-17e9fd0e.jar:?] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) ~[develop-17e9fd0e.jar:?] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ~[flink-dist_2.12-1.11.0.jar:1.11.0] |
Hi Marco, the network buffer pool is destroyed when the task manager is shutdown. Could you check if you have an error before that in your log? It seems like the timer is triggered at a point where it shouldn't. I'll check if there is a known issue that has been fixed in later versions. Do you have the option to upgrade to 1.11.3? Best, Arvid On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <[hidden email]> wrote:
|
Actually, the log I sent in my previous message, shows the only error that occurred before the buffer pool was destroyed. That intermittent warning: 2021-01-26 04:14:33,140 WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity. 2021-01-26 04:14:33,143 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch request (sessionId=936633685, epoch=1) to node 2: {}. org.apache.kafka.common.errors.DisconnectException: null I know that probably doesn't help much. Sorry. On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise <[hidden email]> wrote:
|
Hi Marco, could you share your full task manager and job manager log? We double-checked and saw that the buffer pool is only released on cancellation or shutdown. So I'm assuming that there is another issue (e.g., Kafka cluster not reachable) and there is a race condition while shutting down. It seems like the buffer pool exception is shadowing the actual cause then for yet unknown reasons (this is an issue on its own, but you should be able to see the actual issue in task manager log). Best, Arvid On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos <[hidden email]> wrote:
|
Hi Marco, In general, sending a compressed log to ML is totally fine. You can further minimize the log by disabling restarts. I looked into the logs that you provided.
I can see that my suspicion is most likely correct: It first tries to cancel the task for some reason and then a later timer will show you the respective error. I created the ticket to resolve the issue [1]. There may also be an issue of swalled interruption exceptions, which we are looking into in parallel. However, there is a reason why the task is canceling in the first place and we need to find that. I recommend to not have a try-catch block around collector.collect in ForwardFillKeyedProcessFunction. Just have it around your user code but not around system calls. This may swallow the real cause. Are you executing the code in IDE? You may be able to set some breakpoints to quickly figure out what's going wrong (I can help then). On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise <[hidden email]> wrote:
|
Also could you please provide the jobmanager log? It could also be that the underlying failure is somewhere else. On Thu, Jan 28, 2021 at 10:17 AM Arvid Heise <[hidden email]> wrote:
|
In reply to this post by Arvid Heise-4
Regarding the try catch block, it rethrows the exception. Here is the code: catch (RuntimeException e) { logger.error("Error in timer.", e); throw e; } That would be okay, right? Also, just to be clear, does disabling restart make it easier for you to debug? On Thu, Jan 28, 2021 at 1:17 AM Arvid Heise <[hidden email]> wrote:
|
Regarding the try catch block Sorry I meant the try catch in SensorMessageToSensorTimeSeriesFunction.
Yes the log will be quite small then. Currently, it's just repeating the same things a couple of times. Btw if you also have a second taskmanager, that log would be even more interesting. So best to attach all logs (JM + TMs). On Thu, Jan 28, 2021 at 4:24 PM Marco Villalobos <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |