Re: Checkpoint fail due to timeout

Posted by Roman Khachatryan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Checkpoint-fail-due-to-timeout-tp42125p42464.html

Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x000000002af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <[hidden email]> wrote:

>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>     ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199
>     at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
>     -  blocked on java.lang.Object@2af646cc
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <[hidden email]>
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) <[hidden email]>
> Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > >     -  blocked on java.lang.Object@5366a0e2
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > >
> > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> > >
> > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
> >
> > Hi,
> >
> > This call stack is similar to our case as described in [0]. Maybe they
> > are the same issue?
> >
> > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
> >
> >
> > --
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B