Hello,
We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot).
What could be the reason?
Thanks,
Alexey Screen Shot 2021-03-10 at 9.59.34 PM.png (187K) Download Attachment |
Hello,
This can be caused by several reasons such as back-pressure, large snapshots or bugs. Could you please share: - the stats of the previous (successful) checkpoints - back-pressure metrics for sources - which Flink version do you use? Regards, Roman On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <[hidden email]> wrote: > > Hello, > We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason? > > Thanks, > Alexey > > |
Hello Roman,
Thanks,
Alexey
From: Roman Khachatryan <[hidden email]>
Sent: Thursday, March 11, 2021 11:49 PM To: Alexey Trenikhun <[hidden email]> Cc: Flink User Mail List <[hidden email]> Subject: Re: Checkpoint fail due to timeout Hello,
This can be caused by several reasons such as back-pressure, large snapshots or bugs. Could you please share: - the stats of the previous (successful) checkpoints - back-pressure metrics for sources - which Flink version do you use? Regards, Roman On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <[hidden email]> wrote: > > Hello, > We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason? > > Thanks, > Alexey > > Screen Shot 2021-03-12 at 11.35.22 AM.png (100K) Download Attachment Screen Shot 2021-03-12 at 11.37.48 AM.png (218K) Download Attachment Screen Shot 2021-03-12 at 11.39.39 AM.png (654K) Download Attachment |
Hello Alexey,
Thanks for the details. It looks like backpressure is indeed the cause of the issue. You can check that by looking at the (succeeded) checkpoint start delay in the tasks following the suspected source (digital-itx-eastus2?). To be sure, you can take a thread dump (or profile) those sources: the task thread should be waiting for checkpoint lock; while the legacy source thread should be holding it and waiting to output data. One way to deal with this is to use the new Kafka source (based on FLIP-27) which will hopefully be released in 1.13 (it is an experimental feature in 1.12). Regards, Roman On Fri, Mar 12, 2021 at 8:43 PM Alexey Trenikhun <[hidden email]> wrote: > > Hello Roman, > > history, details and summary stats are attached. > There is backpressure on all sources except Source:gca-cfg and Source:heartbeat > Flink version 1.12.1, I also trying 1.12.2 with same results > > Thanks, > Alexey > ________________________________ > From: Roman Khachatryan <[hidden email]> > Sent: Thursday, March 11, 2021 11:49 PM > To: Alexey Trenikhun <[hidden email]> > Cc: Flink User Mail List <[hidden email]> > Subject: Re: Checkpoint fail due to timeout > > Hello, > > This can be caused by several reasons such as back-pressure, large > snapshots or bugs. > > Could you please share: > - the stats of the previous (successful) checkpoints > - back-pressure metrics for sources > - which Flink version do you use? > > Regards, > Roman > > > On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <[hidden email]> wrote: > > > > Hello, > > We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason? > > > > Thanks, > > Alexey > > > > |
Hi Roman,
I took thread dump:
"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2
owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
- blocked on java.lang.Object@5366a0e2
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
Thanks,
Alexey
From: Roman Khachatryan <[hidden email]>
Sent: Monday, March 15, 2021 2:16 AM To: Alexey Trenikhun <[hidden email]> Cc: Flink User Mail List <[hidden email]> Subject: Re: Checkpoint fail due to timeout Hello Alexey,
Thanks for the details. It looks like backpressure is indeed the cause of the issue. You can check that by looking at the (succeeded) checkpoint start delay in the tasks following the suspected source (digital-itx-eastus2?). To be sure, you can take a thread dump (or profile) those sources: the task thread should be waiting for checkpoint lock; while the legacy source thread should be holding it and waiting to output data. One way to deal with this is to use the new Kafka source (based on FLIP-27) which will hopefully be released in 1.13 (it is an experimental feature in 1.12). Regards, Roman On Fri, Mar 12, 2021 at 8:43 PM Alexey Trenikhun <[hidden email]> wrote: > > Hello Roman, > > history, details and summary stats are attached. > There is backpressure on all sources except Source:gca-cfg and Source:heartbeat > Flink version 1.12.1, I also trying 1.12.2 with same results > > Thanks, > Alexey > ________________________________ > From: Roman Khachatryan <[hidden email]> > Sent: Thursday, March 11, 2021 11:49 PM > To: Alexey Trenikhun <[hidden email]> > Cc: Flink User Mail List <[hidden email]> > Subject: Re: Checkpoint fail due to timeout > > Hello, > > This can be caused by several reasons such as back-pressure, large > snapshots or bugs. > > Could you please share: > - the stats of the previous (successful) checkpoints > - back-pressure metrics for sources > - which Flink version do you use? > > Regards, > Roman > > > On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <[hidden email]> wrote: > > > > Hello, > > We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason? > > > > Thanks, > > Alexey > > > > |
On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> Hi Roman, > I took thread dump: > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > - blocked on java.lang.Object@5366a0e2 > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > at sun.misc.Unsafe.park(Native Method) > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects. This call stack is similar to our case as described in [0]. Maybe they are the same issue? [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc (849 bytes) Download Attachment |
In reply to this post by Alexey Trenikhun
In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours).
From: ChangZhuo Chen (陳昌倬) Sent: Tuesday, March 16, 2021 6:59 AM To: Alexey Trenikhun Cc: [hidden email]; Flink User Mail List Subject: Re: Checkpoint fail due to timeout On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> Hi Roman, > I took thread dump: > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > - blocked on java.lang.Object@5366a0e2 > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > at sun.misc.Unsafe.park(Native Method) > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects. Hi, This call stack is similar to our case as described in [0]. Maybe they are the same issue? [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B |
On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours). You can use aligned checkpoint to scala your job. Just restarting from checkpoint with the same jar file, and new parallelism shall do the trick. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc (849 bytes) Download Attachment |
In reply to this post by Alexey Trenikhun
According to [1] checkpoints do
not support Flink specific features like rescaling, but I can try. Thank you for suggestions
From: ChangZhuo Chen (陳昌倬) Sent: Wednesday, March 17, 2021 12:29 AM To: Alexey Trenikhun Cc: [hidden email]; Flink User Mail List Subject: Re: Checkpoint fail due to timeout On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours). You can use aligned checkpoint to scala your job. Just restarting from checkpoint with the same jar file, and new parallelism shall do the trick. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B |
In reply to this post by ChangZhuo Chen (陳昌倬)
Thanks for sharing the thread dump.
It shows that the source thread is indeed back-pressured (checkpoint lock is held by a thread which is trying to emit but unable to acquire any free buffers). The lock is per task, so there can be several locks per TM. @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely the same issue (but I can't tell for sure without a full thread dump) Regards, Roman On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote: > > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote: > > Hi Roman, > > I took thread dump: > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > - blocked on java.lang.Object@5366a0e2 > > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > at sun.misc.Unsafe.park(Native Method) > > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects. > > Hi, > > This call stack is similar to our case as described in [0]. Maybe they > are the same issue? > > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html > > > -- > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > http://czchen.info/ > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B |
In reply to this post by ChangZhuo Chen (陳昌倬)
Hi Alexey, rescaling from unaligned checkpoints will be supported with the upcoming 1.13 release (expected at the end of April). Best, Arvid On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote: On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote: |
Great! I doubt that it will help in my case however, since in my case even unaligned checkpoints “stuck”, in difference with aligned checkpoints, after unaligned checkpoint triggered, Flink at some moment become idle, kubernetes metrics report very little CPU
usage by container, but unaligned checkpoint still times out after 3hr.
From: Arvid Heise <[hidden email]>
Sent: Monday, March 22, 2021 6:58:20 AM To: ChangZhuo Chen (陳昌倬) <[hidden email]> Cc: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>; Flink User Mail List <[hidden email]> Subject: Re: Checkpoint fail due to timeout Hi Alexey,
rescaling from unaligned checkpoints will be supported with the upcoming 1.13 release (expected at the end of April).
Best,
Arvid
On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote: |
In reply to this post by Roman Khachatryan
Would it help if checkpoint would be fair lock? It looks strange, downstream produces output, so I assume at some moment buffers become available, but lock can’t be acquired for 3+hours
From: Roman Khachatryan <[hidden email]>
Sent: Monday, March 22, 2021 1:36:35 AM To: ChangZhuo Chen (陳昌倬) <[hidden email]> Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]> Subject: Re: Checkpoint fail due to timeout Thanks for sharing the thread dump.
It shows that the source thread is indeed back-pressured (checkpoint lock is held by a thread which is trying to emit but unable to acquire any free buffers). The lock is per task, so there can be several locks per TM. @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely the same issue (but I can't tell for sure without a full thread dump) Regards, Roman On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote: > > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote: > > Hi Roman, > > I took thread dump: > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > - blocked on java.lang.Object@5366a0e2 > > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > at sun.misc.Unsafe.park(Native Method) > > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects. > > Hi, > > This call stack is similar to our case as described in [0]. Maybe they > are the same issue? > > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html > > > -- > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > http://czchen.info/ > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B |
In reply to this post by Roman Khachatryan
I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment.
"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
...
"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc
owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199
at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
- blocked on java.lang.Object@2af646cc
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Thanks,
Alexey
From: Roman Khachatryan <[hidden email]>
Sent: Monday, March 22, 2021 1:36 AM To: ChangZhuo Chen (陳昌倬) <[hidden email]> Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]> Subject: Re: Checkpoint fail due to timeout Thanks for sharing the thread dump.
It shows that the source thread is indeed back-pressured (checkpoint lock is held by a thread which is trying to emit but unable to acquire any free buffers). The lock is per task, so there can be several locks per TM. @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely the same issue (but I can't tell for sure without a full thread dump) Regards, Roman On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote: > > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote: > > Hi Roman, > > I took thread dump: > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > - blocked on java.lang.Object@5366a0e2 > > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > at sun.misc.Unsafe.park(Native Method) > > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects. > > Hi, > > This call stack is similar to our case as described in [0]. Maybe they > are the same issue? > > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html > > > -- > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > http://czchen.info/ > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B |
Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27). Theoretically, the change you've made should improve checkpointing at the cost of throughput. Is it what you see? But the new stack traces seem strange to me as the emission of the checkpoint barrier doesn't require a buffer. I also don't see that the source thread holds the checkpoint lock (something like "locked <0x000000002af646cc> (a java.lang.Object)"). Could you post or attach the full thread dump? Regards, Roman On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <[hidden email]> wrote: > > I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment. > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1 > at sun.misc.Unsafe.park(Native Method) > - waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1 > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > ... > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 > at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54) > - blocked on java.lang.Object@2af646cc > at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) > at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) > > Thanks, > Alexey > ________________________________ > From: Roman Khachatryan <[hidden email]> > Sent: Monday, March 22, 2021 1:36 AM > To: ChangZhuo Chen (陳昌倬) <[hidden email]> > Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]> > Subject: Re: Checkpoint fail due to timeout > > Thanks for sharing the thread dump. > > It shows that the source thread is indeed back-pressured > (checkpoint lock is held by a thread which is trying to emit but > unable to acquire any free buffers). > > The lock is per task, so there can be several locks per TM. > > @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely > the same issue (but I can't tell for sure without a full thread dump) > > > Regards, > Roman > > On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote: > > > > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote: > > > Hi Roman, > > > I took thread dump: > > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > > - blocked on java.lang.Object@5366a0e2 > > > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > > > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > > at sun.misc.Unsafe.park(Native Method) > > > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > > > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > > > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > > > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > > > > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects. > > > > Hi, > > > > This call stack is similar to our case as described in [0]. Maybe they > > are the same issue? > > > > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html > > > > > > -- > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > > http://czchen.info/ > > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B |
In reply to this post by Alexey Trenikhun
Hi Alexey, You should definitely investigate why the job is stuck. 1. First of all, is it completely stuck, or is something moving? - Use Flink metrics [1] (number bytes/records processed), and go through all of the operators/tasks to check this. 2. The stack traces like the one you quoted: > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) you can most likely ignore. Such Task ("Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0") is backpressured and the problem lies downstream. 3. To check what tasks are backpressured, you can also use Flink metrics - check "isBackPressured" metric. Again, back pressured tasks are most likely not the source of the problem. Check downstream from the back pressured task. 4. First (the most upstream) not backpressured task, which is accepting/processing data from some backpressured tasks is the interesting one. It's causing backpressure and you need to investigate what is the problem. Take a look at it's stack traces, maybe attach a remote profiler and profile it's code (if it's making slow progress). Maybe it's stuck in your user code doing something. Please let us know what you have found out. Piotrek pon., 22 mar 2021 o 19:18 Alexey Trenikhun <[hidden email]> napisał(a):
|
In reply to this post by Roman Khachatryan
I also expected improve of checkpointing at the cost of throughput, but in in reality I didn't notice difference neither in checkpointing or throughput.
Backlog was purged by Kafka, so can't post thread dump right now, but I doubt that the problem is gone, so will have next chance during next performance run.
Thanks,
Alexey
From: Roman Khachatryan <[hidden email]>
Sent: Tuesday, March 23, 2021 12:17 AM To: Alexey Trenikhun <[hidden email]> Cc: ChangZhuo Chen (陳昌倬) <[hidden email]>; Flink User Mail List <[hidden email]> Subject: Re: Checkpoint fail due to timeout Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27). Theoretically, the change you've made should improve checkpointing at the cost of throughput. Is it what you see? But the new stack traces seem strange to me as the emission of the checkpoint barrier doesn't require a buffer. I also don't see that the source thread holds the checkpoint lock (something like "locked <0x000000002af646cc> (a java.lang.Object)"). Could you post or attach the full thread dump? Regards, Roman On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <[hidden email]> wrote: > > I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment. > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1 > at sun.misc.Unsafe.park(Native Method) > - waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1 > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > ... > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 > at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54) > - blocked on java.lang.Object@2af646cc > at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) > at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) > > Thanks, > Alexey > ________________________________ > From: Roman Khachatryan <[hidden email]> > Sent: Monday, March 22, 2021 1:36 AM > To: ChangZhuo Chen (陳昌倬) <[hidden email]> > Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]> > Subject: Re: Checkpoint fail due to timeout > > Thanks for sharing the thread dump. > > It shows that the source thread is indeed back-pressured > (checkpoint lock is held by a thread which is trying to emit but > unable to acquire any free buffers). > > The lock is per task, so there can be several locks per TM. > > @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely > the same issue (but I can't tell for sure without a full thread dump) > > > Regards, > Roman > > On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote: > > > > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote: > > > Hi Roman, > > > I took thread dump: > > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > > - blocked on java.lang.Object@5366a0e2 > > > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > > > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > > at sun.misc.Unsafe.park(Native Method) > > > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > > at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > > > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > > > at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > > > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > > > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > > > > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects. > > > > Hi, > > > > This call stack is similar to our case as described in [0]. Maybe they > > are the same issue? > > > > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html > > > > > > -- > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > > http://czchen.info/ > > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B |
In reply to this post by Piotr Nowojski-4
Hi Piotrek,
I can't reproduce problem anymore, before the problem happened 2-3 times in row, I've turned off unaligned checkpoints, now returned unaligned checkpoints back, but the problem seems gone for now. When problem happened there was no progress on source operators,
I thought maybe it was by design, that after acknowledgment source doesn't produce anything till checkpoint complete... I also have union of kafka sources (~50 partitions each), so maybe it same as [1]
From: Piotr Nowojski <[hidden email]>
Sent: Tuesday, March 23, 2021 5:31 AM To: Alexey Trenikhun <[hidden email]> Cc: Arvid Heise <[hidden email]>; ChangZhuo Chen (陳昌倬) <[hidden email]>; [hidden email] <[hidden email]>; Flink User Mail List <[hidden email]> Subject: Re: Checkpoint fail due to timeout Hi Alexey,
You should definitely investigate why the job is stuck.
1. First of all, is it completely stuck, or is something moving? - Use Flink metrics [1] (number bytes/records processed), and go through all of the operators/tasks to check this.
2. The stack traces like the one you quoted:
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) you can most likely ignore. Such Task ("Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0") is backpressured and the problem lies downstream.
3. To check what tasks are backpressured, you can also use Flink metrics - check "isBackPressured" metric. Again, back pressured tasks are most likely not the source of the problem. Check downstream from the back pressured task.
4. First (the most upstream) not backpressured task, which is accepting/processing data from some backpressured tasks is the interesting one. It's causing backpressure and you need to investigate what is the problem. Take a look at it's stack traces, maybe
attach a remote profiler and profile it's code (if it's making slow progress). Maybe it's stuck in your user code doing something.
Please let us know what you have found out.
Piotrek
pon., 22 mar 2021 o 19:18 Alexey Trenikhun <[hidden email]> napisał(a):
|
Free forum by Nabble | Edit this page |