Hi.
I'm running a Flink application (version 1.8.0) that uses FlinkKafkaConsumer to fetch topic data and perform transformation on the data, with state backend as below: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend((StateBackend) new FsStateBackend("file:///test")); env.getCheckpointConfig().setCheckpointTimeout(30_000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); My problem is with the kafka brokers, where in the cluster there are 3 operating brokers and 2 are down - total 5 brokers. I was able to consume the data, but when the checkpoint triggered it throws this exception: [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1. [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28] o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 457b1f801fee89d6f9544409877e29d8. [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job 1c46aa5719bac1f0bea436d460b79db1. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running at org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198) ~[flink-runtime_2.11-1.8.0.jar:1.8.0] at org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700) ~[flink-runtime_2.11-1.8.0.jar:1.8.0] at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_201] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274) ~[flink-runtime_2.11-1.8.0.jar:1.8.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189) ~[flink-runtime_2.11-1.8.0.jar:1.8.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) ~[flink-runtime_2.11-1.8.0.jar:1.8.0] at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) ~[akka-actor_2.11-2.4.20.jar:?] at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502) ~[akka-actor_2.11-2.4.20.jar:?] at akka.actor.Actor$class.aroundReceive(Actor.scala) ~[akka-actor_2.11-2.4.20.jar:?] at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:?] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) ~[akka-actor_2.11-2.4.20.jar:?] at akka.actor.ActorCell.invoke(ActorCell.scala:495) ~[akka-actor_2.11-2.4.20.jar:?] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) ~[akka-actor_2.11-2.4.20.jar:?] at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:?] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:?] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.12.jar:?] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.12.jar:?] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:?] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:?] [INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28] o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED. org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata My question is (as I think what does the checkpoint tries to do) why is it trying to fetch topic metadata from the brokers that are down? Thanks, Yitzchak. |
Hi. Another question - what will happen during a triggered checkpoint if one of the kafka brokers is unavailable? Will appreciate your insights. Thanks. On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <[hidden email]> wrote:
|
Hi Yitzchak, Thanks for reaching out. I'm not an expert on the Kafka consumer, but I think the number of partitions and the number of source tasks might be interesting to know. Maybe Gordon (in CC) has an idea of what's going wrong here. Best, Fabian Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman <[hidden email]>:
|
Hi. Do we have an idea for this exception? Thanks, Yitzchak. On Tue, Jul 23, 2019 at 12:59 PM Fabian Hueske <[hidden email]> wrote:
|
Hi. Turned out that the cause was non-replicated (replication factor = 1) topics in Kafka. On Wed, Jul 24, 2019 at 4:20 PM Yitzchak Lieberman <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |