Hi there, I have a stream where I reload a huge list from time to time. I know there are various Side-Input patterns, but none of them seem to be perfect so I stuck with an easy approach: I use a Guava Cache and if it expires and a new element comes in, processing of the element is blocked up until the new list is loaded. That approach runs in production for a while now and it works fine, as the cache has a mechanism to reload the list only on a real change. Now today, the list changed from a few hundred MB to multiple GB at a time where the network in general was a bit congested already. One TaskManager needed round about 4minutes to load the list, but after 30seconds, it reported it lost connection to zookeeper and had thus no more information about the leading jobmanager, leading to a crashing loop. That crash & restart loop continued for 30minutes up until the list was rolled back and was then successfully loaded again. Now my question: * If processing of an element blocks, I understand that its also not possible to perform checkpoints at that time, but I didn't expect Zookeeper, Heartbeats or other threads of the taskmanager to timeout. Was that just a coincidence of the network being congested or is that something in the design of Flink that a long blocking call can lead to crashes? (Other than X checkpoints timed out and following a configured forced crash occured). Which threads can be blocked in Flink during a map in a MapFunction? * For this approach with kind of a cached reload, should I switch to async IO or just put loading of the list in a background thread? In my case, it's not really important that processing is blocked up until the list is loaded. And in case of async IO: 99,999% of the events would directly return and would thus not be async, it's always just a single one triggering reload of the list, so it doesn't seem to be perfectly suited here? Im running on Flink 1.11 and heres the relevant excerpt from the log: 2020-11-05T09:41:40.933865+01:00 [WARN] Client session timed out, have not heard from server in 33897ms for sessionid 0x374de97ba0afac9 2020-11-05T09:41:40.936488+01:00 [INFO] Client session timed out, have not heard from server in 33897ms for sessionid 0x374de97ba0afac9, closing socket connection and attempting reconnect 2020-11-05T09:41:41.042032+01:00 [INFO] State change: SUSPENDED 2020-11-05T09:41:41.168802+01:00 [WARN] Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2020-11-05T09:41:41.169276+01:00 [WARN] Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2020-11-05T09:41:41.169514+01:00 [INFO] Close ResourceManager connection e4d1f9acca4ea3c5a793877467218452. 2020-11-05T09:41:41.169514+01:00 [INFO] JobManager for job 0dcfb212136daefbcbfe480c6a260261 with leader id 8440610cd5de998bd6c65f3717de42b8 lost leadership. 2020-11-05T09:41:41.185104+01:00 [INFO] Close JobManager connection for job 0dcfb212136daefbcbfe480c6a260261. 2020-11-05T09:41:41.185354+01:00 [INFO] Attempting to fail task externally .... (c596aafa324b152911cb53ab4e6d1cc2). 2020-11-05T09:41:41.187980+01:00 [WARN] ... (c596aafa324b152911cb53ab4e6d1cc2) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: JobManager responsible for 0dcfb212136daefbcbfe480c6a260261 lost the leadership. at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173) at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Job leader for job id 0dcfb212136daefbcbfe480c6a260261 lost leadership. ... 24 more Best regards Theo |
I'd go with the network congestion
theory for the time being; then the only remedy is throttling the
download of said list, or somehow reducing the size of it
significantly
.
What the task thread is doing doesn't
matter in regards to HA; it may cause checkpoints to time out, but
should have no other effects. (unless it magically consumes all
CPU resources of system)
If you block in any function, then
you're just blocking the task thread; nothing else.
On 11/5/2020 10:40 PM, Theo Diefenthal
wrote:
|
Hi Theo,
We had a very similar problem with one of our spark streaming jobs. Best solution was to create a custom source having all external records in cache, periodically reading external data and comparing it to cache. All changed records were then broadcasted to task managers. We tried to implement background loading in separate thread, but this solution was more complicated, we needed to create shadow copy of cache and then quickly switch them. And with spark streaming there were additional problems. Hope this helps, Maxim. |
Free forum by Nabble | Edit this page |