Enabling Checkpointing using FsStatebackend

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Enabling Checkpointing using FsStatebackend

sudhansu069
Hi Team,

We have recently enabled checking pointing using FsStateBackend where we are trying to use S3 bucket as the persistent storage but after enabling it we are running into issues while submitting the job into the cluster.

Can you please let us know if we are missing anything ?


Below is the code sampleĀ  for enabling Checkpointing.

env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/fhirmapper"));
env.enableCheckpointing(1000);




Below logs for the issue.


org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=15000)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor366.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed.


Thanks,
Sudhansu


Reply | Threaded
Open this post in threaded view
|

Re: Enabling Checkpointing using FsStatebackend

Yangze Guo
Hi,

I think the checkpointing is not the root cause of your job failure.
As the log describes, your job failed caused by the authorization
issue of Kafka. "Caused by:
org.apache.kafka.common.errors.TransactionalIdAuthorizationException:
Transactional Id authorization failed."

Best,
Yangze Guo

On Fri, May 7, 2021 at 11:29 PM sudhansu jena
<[hidden email]> wrote:

>
> Hi Team,
>
> We have recently enabled checking pointing using FsStateBackend where we are trying to use S3 bucket as the persistent storage but after enabling it we are running into issues while submitting the job into the cluster.
>
> Can you please let us know if we are missing anything ?
>
>
> Below is the code sample  for enabling Checkpointing.
>
> env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/fhirmapper"));
> env.enableCheckpointing(1000);
>
>
>
> Below logs for the issue.
>
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=15000)
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
> at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> at jdk.internal.reflect.GeneratedMethodAccessor366.invoke(Unknown Source)
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed.
>
>
> Thanks,
> Sudhansu
>
>