Checkpoint fail due to timeout

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint fail due to timeout

Alexey Trenikhun
Hello,
We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason? 

Thanks,
Alexey



Screen Shot 2021-03-10 at 9.59.34 PM.png (187K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Roman Khachatryan
Hello,

This can be caused by several reasons such as back-pressure, large
snapshots or bugs.

Could you please share:
- the stats of the previous (successful) checkpoints
- back-pressure metrics for sources
- which Flink version do you use?

Regards,
Roman


On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello,
> We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?
>
> Thanks,
> Alexey
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
Hello Roman,

  •  history, details and summary stats are attached.
  • There is backpressure on all sources except Source:gca-cfg and Source:heartbeat
  • Flink version 1.12.1, I also trying 1.12.2 with same results
Thanks,
Alexey

From: Roman Khachatryan <[hidden email]>
Sent: Thursday, March 11, 2021 11:49 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Hello,

This can be caused by several reasons such as back-pressure, large
snapshots or bugs.

Could you please share:
- the stats of the previous (successful) checkpoints
- back-pressure metrics for sources
- which Flink version do you use?

Regards,
Roman


On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello,
> We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?
>
> Thanks,
> Alexey
>
>

Screen Shot 2021-03-12 at 11.35.22 AM.png (100K) Download Attachment
Screen Shot 2021-03-12 at 11.37.48 AM.png (218K) Download Attachment
Screen Shot 2021-03-12 at 11.39.39 AM.png (654K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Roman Khachatryan
Hello Alexey,

Thanks for the details.

It looks like backpressure is indeed the cause of the issue.
You can check that by looking at the (succeeded) checkpoint start
delay in the tasks following the suspected source
(digital-itx-eastus2?).
To be sure, you can take a thread dump (or profile) those sources: the
task thread should be waiting for checkpoint lock; while the legacy
source thread should be holding it and waiting to output data.

One way to deal with this is to use the new Kafka source (based on
FLIP-27) which will hopefully be released in 1.13 (it is an
experimental feature in 1.12).

Regards,
Roman

On Fri, Mar 12, 2021 at 8:43 PM Alexey Trenikhun <[hidden email]> wrote:

>
> Hello Roman,
>
>  history, details and summary stats are attached.
> There is backpressure on all sources except Source:gca-cfg and Source:heartbeat
> Flink version 1.12.1, I also trying 1.12.2 with same results
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <[hidden email]>
> Sent: Thursday, March 11, 2021 11:49 PM
> To: Alexey Trenikhun <[hidden email]>
> Cc: Flink User Mail List <[hidden email]>
> Subject: Re: Checkpoint fail due to timeout
>
> Hello,
>
> This can be caused by several reasons such as back-pressure, large
> snapshots or bugs.
>
> Could you please share:
> - the stats of the previous (successful) checkpoints
> - back-pressure metrics for sources
> - which Flink version do you use?
>
> Regards,
> Roman
>
>
> On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <[hidden email]> wrote:
> >
> > Hello,
> > We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?
> >
> > Thanks,
> > Alexey
> >
> >
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
Hi Roman,
I took thread dump:
"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    -  blocked on java.lang.Object@5366a0e2
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)

Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.

Thanks,
Alexey

From: Roman Khachatryan <[hidden email]>
Sent: Monday, March 15, 2021 2:16 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Hello Alexey,

Thanks for the details.

It looks like backpressure is indeed the cause of the issue.
You can check that by looking at the (succeeded) checkpoint start
delay in the tasks following the suspected source
(digital-itx-eastus2?).
To be sure, you can take a thread dump (or profile) those sources: the
task thread should be waiting for checkpoint lock; while the legacy
source thread should be holding it and waiting to output data.

One way to deal with this is to use the new Kafka source (based on
FLIP-27) which will hopefully be released in 1.13 (it is an
experimental feature in 1.12).

Regards,
Roman

On Fri, Mar 12, 2021 at 8:43 PM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello Roman,
>
>  history, details and summary stats are attached.
> There is backpressure on all sources except Source:gca-cfg and Source:heartbeat
> Flink version 1.12.1, I also trying 1.12.2 with same results
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <[hidden email]>
> Sent: Thursday, March 11, 2021 11:49 PM
> To: Alexey Trenikhun <[hidden email]>
> Cc: Flink User Mail List <[hidden email]>
> Subject: Re: Checkpoint fail due to timeout
>
> Hello,
>
> This can be caused by several reasons such as back-pressure, large
> snapshots or bugs.
>
> Could you please share:
> - the stats of the previous (successful) checkpoints
> - back-pressure metrics for sources
> - which Flink version do you use?
>
> Regards,
> Roman
>
>
> On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <[hidden email]> wrote:
> >
> > Hello,
> > We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?
> >
> > Thanks,
> > Alexey
> >
> >
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

ChangZhuo Chen (陳昌倬)
On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:

> Hi Roman,
> I took thread dump:
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>     -  blocked on java.lang.Object@5366a0e2
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>
> Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
Hi,

This call stack is similar to our case as described in [0]. Maybe they
are the same issue?

[0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
In reply to this post by Alexey Trenikhun
In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours). 



From: ChangZhuo Chen (陳昌倬)
Sent: Tuesday, March 16, 2021 6:59 AM
To: Alexey Trenikhun
Cc: [hidden email]; Flink User Mail List
Subject: Re: Checkpoint fail due to timeout

On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> Hi Roman,
> I took thread dump:
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>     -  blocked on java.lang.Object@5366a0e2
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>
> Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.

Hi,

This call stack is similar to our case as described in [0]. Maybe they
are the same issue?

[0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

ChangZhuo Chen (陳昌倬)
On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
In reply to this post by Alexey Trenikhun
According to [1] checkpoints do not support Flink specific features like rescaling, but I can try. Thank you for suggestions




From: ChangZhuo Chen (陳昌倬)
Sent: Wednesday, March 17, 2021 12:29 AM
To: Alexey Trenikhun
Cc: [hidden email]; Flink User Mail List
Subject: Re: Checkpoint fail due to timeout

On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Roman Khachatryan
In reply to this post by ChangZhuo Chen (陳昌倬)
Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:

>
> On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> >     -  blocked on java.lang.Object@5366a0e2
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at sun.misc.Unsafe.park(Native Method)
> >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Arvid Heise-4
In reply to this post by ChangZhuo Chen (陳昌倬)
Hi Alexey,

rescaling from unaligned checkpoints will be supported with the upcoming 1.13 release (expected at the end of April).

Best,

Arvid

On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
Great! I doubt that it will help in my case however, since in my case even unaligned checkpoints “stuck”, in difference with aligned checkpoints, after unaligned checkpoint triggered, Flink at some moment become idle, kubernetes metrics report very little CPU usage by container, but unaligned checkpoint still times out after 3hr. 


From: Arvid Heise <[hidden email]>
Sent: Monday, March 22, 2021 6:58:20 AM
To: ChangZhuo Chen (陳昌倬) <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Hi Alexey,

rescaling from unaligned checkpoints will be supported with the upcoming 1.13 release (expected at the end of April).

Best,

Arvid

On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
In reply to this post by Roman Khachatryan
Would it help if checkpoint would be fair lock? It looks strange, downstream produces output, so I assume at some moment buffers become available, but lock can’t be acquired for 3+hours


From: Roman Khachatryan <[hidden email]>
Sent: Monday, March 22, 2021 1:36:35 AM
To: ChangZhuo Chen (陳昌倬) <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> >     -  blocked on java.lang.Object@5366a0e2
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at sun.misc.Unsafe.park(Native Method)
> >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
In reply to this post by Roman Khachatryan
I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment. 

"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
    ...

"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199
    at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
    -  blocked on java.lang.Object@2af646cc
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)

Thanks,
Alexey

From: Roman Khachatryan <[hidden email]>
Sent: Monday, March 22, 2021 1:36 AM
To: ChangZhuo Chen (陳昌倬) <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> >     -  blocked on java.lang.Object@5366a0e2
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at sun.misc.Unsafe.park(Native Method)
> >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Roman Khachatryan
Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x000000002af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <[hidden email]> wrote:

>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>     ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199
>     at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
>     -  blocked on java.lang.Object@2af646cc
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <[hidden email]>
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) <[hidden email]>
> Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > >     -  blocked on java.lang.Object@5366a0e2
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > >
> > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> > >
> > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
> >
> > Hi,
> >
> > This call stack is similar to our case as described in [0]. Maybe they
> > are the same issue?
> >
> > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
> >
> >
> > --
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Piotr Nowojski-4
In reply to this post by Alexey Trenikhun
Hi Alexey,

You should definitely investigate why the job is stuck. 
1. First of all, is it completely stuck, or is something moving? - Use Flink metrics [1] (number bytes/records processed), and go through all of the operators/tasks to check this.
2. The stack traces like the one you quoted:
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
you can most likely ignore. Such Task ("Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0") is backpressured and the problem lies downstream. 
3. To check what tasks are backpressured, you can also use Flink metrics - check "isBackPressured" metric. Again, back pressured tasks are most likely not the source of the problem. Check downstream from the back pressured task.
4. First (the most upstream) not backpressured task, which is accepting/processing data from some backpressured tasks is the interesting one. It's causing backpressure and you need to investigate what is the problem. Take a look at it's stack traces, maybe attach a remote profiler and profile it's code (if it's making slow progress). Maybe it's stuck in your user code doing something.

Please let us know what you have found out.

Piotrek


pon., 22 mar 2021 o 19:18 Alexey Trenikhun <[hidden email]> napisał(a):
Great! I doubt that it will help in my case however, since in my case even unaligned checkpoints “stuck”, in difference with aligned checkpoints, after unaligned checkpoint triggered, Flink at some moment become idle, kubernetes metrics report very little CPU usage by container, but unaligned checkpoint still times out after 3hr. 


From: Arvid Heise <[hidden email]>
Sent: Monday, March 22, 2021 6:58:20 AM
To: ChangZhuo Chen (陳昌倬) <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Hi Alexey,

rescaling from unaligned checkpoints will be supported with the upcoming 1.13 release (expected at the end of April).

Best,

Arvid

On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
In reply to this post by Roman Khachatryan
I also expected improve of checkpointing at the cost of throughput, but in in reality I didn't notice difference neither in checkpointing or throughput. Backlog was purged by Kafka, so can't post thread dump right now, but I doubt that the problem is gone, so will have next chance during next performance run.

Thanks,
Alexey


From: Roman Khachatryan <[hidden email]>
Sent: Tuesday, March 23, 2021 12:17 AM
To: Alexey Trenikhun <[hidden email]>
Cc: ChangZhuo Chen (陳昌倬) <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x000000002af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <[hidden email]> wrote:
>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>     ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199
>     at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
>     -  blocked on java.lang.Object@2af646cc
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <[hidden email]>
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) <[hidden email]>
> Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > >     -  blocked on java.lang.Object@5366a0e2
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > >
> > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> > >
> > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
> >
> > Hi,
> >
> > This call stack is similar to our case as described in [0]. Maybe they
> > are the same issue?
> >
> > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
> >
> >
> > --
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint fail due to timeout

Alexey Trenikhun
In reply to this post by Piotr Nowojski-4
Hi Piotrek,

I can't reproduce problem anymore, before the problem happened 2-3 times in row, I've turned off unaligned checkpoints, now returned unaligned checkpoints back, but the problem seems gone for now. When problem happened there was no progress on source operators, I thought maybe it was by design, that after acknowledgment source doesn't produce anything till checkpoint complete... I also have union of kafka sources (~50 partitions each), so maybe it same as [1]



From: Piotr Nowojski <[hidden email]>
Sent: Tuesday, March 23, 2021 5:31 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Arvid Heise <[hidden email]>; ChangZhuo Chen (陳昌倬) <[hidden email]>; [hidden email] <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Hi Alexey,

You should definitely investigate why the job is stuck. 
1. First of all, is it completely stuck, or is something moving? - Use Flink metrics [1] (number bytes/records processed), and go through all of the operators/tasks to check this.
2. The stack traces like the one you quoted:
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
you can most likely ignore. Such Task ("Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0") is backpressured and the problem lies downstream. 
3. To check what tasks are backpressured, you can also use Flink metrics - check "isBackPressured" metric. Again, back pressured tasks are most likely not the source of the problem. Check downstream from the back pressured task.
4. First (the most upstream) not backpressured task, which is accepting/processing data from some backpressured tasks is the interesting one. It's causing backpressure and you need to investigate what is the problem. Take a look at it's stack traces, maybe attach a remote profiler and profile it's code (if it's making slow progress). Maybe it's stuck in your user code doing something.

Please let us know what you have found out.

Piotrek


pon., 22 mar 2021 o 19:18 Alexey Trenikhun <[hidden email]> napisał(a):
Great! I doubt that it will help in my case however, since in my case even unaligned checkpoints “stuck”, in difference with aligned checkpoints, after unaligned checkpoint triggered, Flink at some moment become idle, kubernetes metrics report very little CPU usage by container, but unaligned checkpoint still times out after 3hr. 


From: Arvid Heise <[hidden email]>
Sent: Monday, March 22, 2021 6:58:20 AM
To: ChangZhuo Chen (陳昌倬) <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Checkpoint fail due to timeout
 
Hi Alexey,

rescaling from unaligned checkpoints will be supported with the upcoming 1.13 release (expected at the end of April).

Best,

Arvid

On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
On Wed, Mar 17, 2021 at 05:45:38AM +0000, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to rescale I need to take savepoint, which never completes (at least takes longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B