Recovery problem 1 of 2 in Flink 1.6.3

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Recovery problem 1 of 2 in Flink 1.6.3

John Stone-2

This is the first of two recovery problems I'm seeing running Flink 1.6.3 in Kubernetes.  I'm posting them in separate messages for brevity and because the second is not directly related to the first.  Any advice is appreciated.

 

Setup:

Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One JobManager and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each TaskManager has 16 task slots.  High availability is enabled.  S3 (s3a) for storage.  RocksDB with incremental snapshots.  It doesn't matter if local recover is enabled - I've managed to replicate with both local recovery enabled and disabled.

 

Problem:

Flink cannot recover a job unless there are the same number of free task slots as the job's parallelism.

 

Replication steps:

Create a job with a parallelism of either 17 or 32 - enough to force the job to use both TMs.  After the job has successfully is fully running and has taken a checkpoint, delete one of the TaskManagers (TM_1).  Kubernetes will spawn a new TaskManager (TM_3) which will successfully connect to the JobManager.

 

Actual Behavior:

The running job will be canceled and redeployed but will be caught in a SCHEDULED state (shows as CREATED in the web UI).  JobManager will repeatively attempt to request slots from the ResourceManager.  The tasks in the job will never resume.

 

Expected Behavior:

Job should be fully unscheduled from TM_2.  TM_2 and TM_3 should pick up the job.  The job should successfully resume from the last checkpoint.

 

Known Workarounds:

1) Cancel and resubmit the job.

2) Using the above example, have a free TaskManager (TM_4) that also has 16 available slots.

 

Log snip:

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -> Flat Map -> Sink: Unnamed (29/32) (6078b9c76953c7c27b05b522880d3d1b) switched from CANCELING to CANCELED.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) if no longer possible.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state FAILING to RESTARTING.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb).

2019-01-10 19:42:50,302 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state RESTARTING to CREATED.

2019-01-10 19:42:50,302 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 1.

2019-01-10 19:42:50,386 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job c44a91b76ea99ead6fdf9b13a98c15bb from latest valid checkpoint: Checkpoint 1 @ 1547149215694 for c44a91b76ea99ead6fdf9b13a98c15bb.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state to restore

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state CREATED to RUNNING.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Kafka Consumer (1/1) (49b728769a3a2b3a3a6ba45cd4445e3b) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Bundle Kafka Consumer (1/1) (1220cf4b9f5eb937191bb2232a482899) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) (29f69ee8fbc208cd7c63e99907d11386) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Ticket Update Kafka Consumer (1/1) (4bdfbcb7280fb7a7c9ea2d5aa02efa41) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Requesting new slot [SlotRequestId{83bcd1c29b885a7799bf6e5d73d1961c}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Product Kafka Consumer (1/1) (9bfdbe9141c8b6715b890a35b026bb3b) switched from CREATED to SCHEDULED.

 

...snip...

 

2019-01-10 19:42:50,471 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -> Flat Map -> Sink: Unnamed (32/32) (cd851b29b5533f8a3d4812ac7e8d47ab) switched from CREATED to SCHEDULED.

2019-01-10 19:42:52,733 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 6c84753ff746978c90a2e6d7627e6263 (akka.tcp://flink@flink-taskmanager-5748955d9-4wmj9:6126/user/taskmanager_0) at ResourceManager

2019-01-10 19:42:52,736 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 6c84753ff746978c90a2e6d7627e6263 (akka.tcp://flink@flink-taskmanager-5748955d9-4wmj9:6126/user/taskmanager_0) at ResourceManager

2019-01-10 19:42:55,572 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@flink-taskmanager-5748955d9-vgtl4:6126] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flink-taskmanager-5748955d9-vgtl4:6126]] Caused by: [flink-taskmanager-5748955d9-vgtl4: Name or service not known]

2019-01-10 19:44:20,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) of job c44a91b76ea99ead6fdf9b13a98c15bb is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

2019-01-10 19:45:50,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) of job c44a91b76ea99ead6fdf9b13a98c15bb is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

 

Many thanks,

 

John Stone

 

Reply | Threaded
Open this post in threaded view
|

Re: Recovery problem 1 of 2 in Flink 1.6.3

John Stone-2

Is this a known issue?  Should I create a Jira ticket?  Does anyone have anything they would like me to try?  I’m very lost at this point.

 

I’ve now seen this issue happen without destroying pods, i.e. the job running crashes after several hours and fails to recover once all task slots are consumed by stale tasks.  I’m adding additional information in hopes of getting to the bottom of this.

 

Timeline of crash (I do not have all logs as the log had rolled by the time I was able to get the following)

 

TaskManager 1, 2019-01-12 11:32:44, throws the following exception:

 

2019-01-12 11:32:44,170 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16) (cd737fd979a849a713c5808f96d06cf1).

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 758 for operator Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).}

    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)

    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)

    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)

    …snip…

Caused by: java.lang.Exception: Could not materialize checkpoint 758 for operator Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).

    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)

    ... 6 more

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3a://my-bucket/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f in order to obtain the stream state handle

    at java.util.concurrent.FutureTask.report(FutureTask.java:122)

    at java.util.concurrent.FutureTask.get(FutureTask.java:192)

    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)

    …snip…

Caused by: java.io.IOException: Could not flush and close the file system output stream to s3a://te2-flink/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f in order to obtain the stream state handle

    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)

    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeStateData(RocksDBKeyedStateBackend.java:2454)

    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.runSnapshot(RocksDBKeyedStateBackend.java:2588)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)

    ... 7 more

Caused by: org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.AWSS3IOException: saving output on stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686), S3 Extended Request ID: 3hffGK+DZisRFGwTA/X8bJdruPmvRimlmedS7WLZYUMXJ5z+otVdfQdSJUwjLDtryilapjSesz0=: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:121)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)

    at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)

    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)

    ... 11 more

Caused by: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686), S3 Extended Request ID: 3hffGK+DZisRFGwTA/X8bJdruPmvRimlmedS7WLZYUMXJ5z+otVdfQdSJUwjLDtryilapjSesz0=

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1579)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1249)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)

   …snip…

 

------

 

TaskManager 2, 2019-01-12 11:43:04,095 , then begins throwing the following:

 

2019-01-12 11:43:04,095 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map (12/16) (ef9e2e6c50fbca56995d1293d08c7f59) switched from RUNNING to FAILED.

org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt ef9e2e6c50fbca56995d1293d08c7f59 was not found.

    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:545)

    at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)

    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)

    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

2019-01-12 11:43:04,209 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map (13/16) (76128dfbaf40a6e9ae4ec8f7cf53b0bb) switched from RUNNING to FAILED.

org.apache.flink.util.FlinkException: JobManager responsible for 00ec28e4a356a80f48269b0b5f0f5de6 lost the leadership.

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1173)

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManager(TaskExecutor.java:856)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:242)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 7123f2e4018e0e7b823c8bbcde5c9e9b timed out.

    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)

    at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)

    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

    ... 4 more

 

2019-01-12 11:44:00,290 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (1/16) (9320e1ac8143dce9ef827d2bea2d274e) switched from RUNNING to FAILED.

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 3814daa5451408e1746b6edccf3469fe@8e9d09d5282e5ed7fb5af4a4e937ce2d not found.

    at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)

    at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)

    at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)

    at java.util.TimerThread.mainLoop(Timer.java:555)

    at java.util.TimerThread.run(Timer.java:505)

2019-01-12 11:44:00,292 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (1/16) (9320e1ac8143dce9ef827d2bea2d274e).

 

 

From: John Stone <[hidden email]>
Date: Thursday, January 10, 2019 at 3:31 PM
To: "[hidden email]" <[hidden email]>
Subject: Recovery problem 1 of 2 in Flink 1.6.3

 

This is the first of two recovery problems I'm seeing running Flink 1.6.3 in Kubernetes.  I'm posting them in separate messages for brevity and because the second is not directly related to the first.  Any advice is appreciated.

 

Setup:

Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One JobManager and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each TaskManager has 16 task slots.  High availability is enabled.  S3 (s3a) for storage.  RocksDB with incremental snapshots.  It doesn't matter if local recover is enabled - I've managed to replicate with both local recovery enabled and disabled.

 

Problem:

Flink cannot recover a job unless there are the same number of free task slots as the job's parallelism.

 

Replication steps:

Create a job with a parallelism of either 17 or 32 - enough to force the job to use both TMs.  After the job has successfully is fully running and has taken a checkpoint, delete one of the TaskManagers (TM_1).  Kubernetes will spawn a new TaskManager (TM_3) which will successfully connect to the JobManager.

 

Actual Behavior:

The running job will be canceled and redeployed but will be caught in a SCHEDULED state (shows as CREATED in the web UI).  JobManager will repeatively attempt to request slots from the ResourceManager.  The tasks in the job will never resume.

 

Expected Behavior:

Job should be fully unscheduled from TM_2.  TM_2 and TM_3 should pick up the job.  The job should successfully resume from the last checkpoint.

 

Known Workarounds:

1) Cancel and resubmit the job.

2) Using the above example, have a free TaskManager (TM_4) that also has 16 available slots.

 

Log snip:

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -> Flat Map -> Sink: Unnamed (29/32) (6078b9c76953c7c27b05b522880d3d1b) switched from CANCELING to CANCELED.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) if no longer possible.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state FAILING to RESTARTING.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb).

2019-01-10 19:42:50,302 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state RESTARTING to CREATED.

2019-01-10 19:42:50,302 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 1.

2019-01-10 19:42:50,386 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job c44a91b76ea99ead6fdf9b13a98c15bb from latest valid checkpoint: Checkpoint 1 @ 1547149215694 for c44a91b76ea99ead6fdf9b13a98c15bb.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state to restore

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state CREATED to RUNNING.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Kafka Consumer (1/1) (49b728769a3a2b3a3a6ba45cd4445e3b) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Bundle Kafka Consumer (1/1) (1220cf4b9f5eb937191bb2232a482899) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) (29f69ee8fbc208cd7c63e99907d11386) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Ticket Update Kafka Consumer (1/1) (4bdfbcb7280fb7a7c9ea2d5aa02efa41) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Requesting new slot [SlotRequestId{83bcd1c29b885a7799bf6e5d73d1961c}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Product Kafka Consumer (1/1) (9bfdbe9141c8b6715b890a35b026bb3b) switched from CREATED to SCHEDULED.

 

...snip...

 

2019-01-10 19:42:50,471 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -> Flat Map -> Sink: Unnamed (32/32) (cd851b29b5533f8a3d4812ac7e8d47ab) switched from CREATED to SCHEDULED.

2019-01-10 19:42:52,733 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 6c84753ff746978c90a2e6d7627e6263 (akka.tcp://flink@flink-taskmanager-5748955d9-4wmj9:6126/user/taskmanager_0) at ResourceManager

2019-01-10 19:42:52,736 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 6c84753ff746978c90a2e6d7627e6263 (akka.tcp://flink@flink-taskmanager-5748955d9-4wmj9:6126/user/taskmanager_0) at ResourceManager

2019-01-10 19:42:55,572 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@flink-taskmanager-5748955d9-vgtl4:6126] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flink-taskmanager-5748955d9-vgtl4:6126]] Caused by: [flink-taskmanager-5748955d9-vgtl4: Name or service not known]

2019-01-10 19:44:20,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) of job c44a91b76ea99ead6fdf9b13a98c15bb is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

2019-01-10 19:45:50,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) of job c44a91b76ea99ead6fdf9b13a98c15bb is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

 

Many thanks,

 

John Stone

 

Reply | Threaded
Open this post in threaded view
|

Re: Recovery problem 1 of 2 in Flink 1.6.3

Till Rohrmann
Hi John,

this is definitely not how Flink should behave in this situation and could indicate a bug. From the logs I couldn't figure out the problem. Would it be possible to obtain for the TMs and JM the full logs with DEBUG log level? This would help me to further debug the problem.

Cheers,
Till

On Mon, Jan 14, 2019 at 5:04 PM John Stone <[hidden email]> wrote:

Is this a known issue?  Should I create a Jira ticket?  Does anyone have anything they would like me to try?  I’m very lost at this point.

 

I’ve now seen this issue happen without destroying pods, i.e. the job running crashes after several hours and fails to recover once all task slots are consumed by stale tasks.  I’m adding additional information in hopes of getting to the bottom of this.

 

Timeline of crash (I do not have all logs as the log had rolled by the time I was able to get the following)

 

TaskManager 1, 2019-01-12 11:32:44, throws the following exception:

 

2019-01-12 11:32:44,170 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16) (cd737fd979a849a713c5808f96d06cf1).

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 758 for operator Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).}

    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)

    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)

    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)

    …snip…

Caused by: java.lang.Exception: Could not materialize checkpoint 758 for operator Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).

    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)

    ... 6 more

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3a://my-bucket/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f in order to obtain the stream state handle

    at java.util.concurrent.FutureTask.report(FutureTask.java:122)

    at java.util.concurrent.FutureTask.get(FutureTask.java:192)

    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)

    …snip…

Caused by: java.io.IOException: Could not flush and close the file system output stream to s3a://te2-flink/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f in order to obtain the stream state handle

    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)

    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeStateData(RocksDBKeyedStateBackend.java:2454)

    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.runSnapshot(RocksDBKeyedStateBackend.java:2588)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)

    ... 7 more

Caused by: org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.AWSS3IOException: saving output on stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686), S3 Extended Request ID: 3hffGK+DZisRFGwTA/X8bJdruPmvRimlmedS7WLZYUMXJ5z+otVdfQdSJUwjLDtryilapjSesz0=: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:121)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)

    at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)

    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)

    ... 11 more

Caused by: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686), S3 Extended Request ID: 3hffGK+DZisRFGwTA/X8bJdruPmvRimlmedS7WLZYUMXJ5z+otVdfQdSJUwjLDtryilapjSesz0=

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1579)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1249)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)

   …snip…

 

------

 

TaskManager 2, 2019-01-12 11:43:04,095 , then begins throwing the following:

 

2019-01-12 11:43:04,095 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map (12/16) (ef9e2e6c50fbca56995d1293d08c7f59) switched from RUNNING to FAILED.

org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt ef9e2e6c50fbca56995d1293d08c7f59 was not found.

    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:545)

    at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)

    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)

    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

2019-01-12 11:43:04,209 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map (13/16) (76128dfbaf40a6e9ae4ec8f7cf53b0bb) switched from RUNNING to FAILED.

org.apache.flink.util.FlinkException: JobManager responsible for 00ec28e4a356a80f48269b0b5f0f5de6 lost the leadership.

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1173)

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManager(TaskExecutor.java:856)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:242)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 7123f2e4018e0e7b823c8bbcde5c9e9b timed out.

    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)

    at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)

    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

    ... 4 more

 

2019-01-12 11:44:00,290 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (1/16) (9320e1ac8143dce9ef827d2bea2d274e) switched from RUNNING to FAILED.

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 3814daa5451408e1746b6edccf3469fe@8e9d09d5282e5ed7fb5af4a4e937ce2d not found.

    at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)

    at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)

    at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)

    at java.util.TimerThread.mainLoop(Timer.java:555)

    at java.util.TimerThread.run(Timer.java:505)

2019-01-12 11:44:00,292 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Window(SlidingEventTimeWindows(57600000, 14400000), EventTimeTrigger, CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (1/16) (9320e1ac8143dce9ef827d2bea2d274e).

 

 

From: John Stone <[hidden email]>
Date: Thursday, January 10, 2019 at 3:31 PM
To: "[hidden email]" <[hidden email]>
Subject: Recovery problem 1 of 2 in Flink 1.6.3

 

This is the first of two recovery problems I'm seeing running Flink 1.6.3 in Kubernetes.  I'm posting them in separate messages for brevity and because the second is not directly related to the first.  Any advice is appreciated.

 

Setup:

Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One JobManager and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each TaskManager has 16 task slots.  High availability is enabled.  S3 (s3a) for storage.  RocksDB with incremental snapshots.  It doesn't matter if local recover is enabled - I've managed to replicate with both local recovery enabled and disabled.

 

Problem:

Flink cannot recover a job unless there are the same number of free task slots as the job's parallelism.

 

Replication steps:

Create a job with a parallelism of either 17 or 32 - enough to force the job to use both TMs.  After the job has successfully is fully running and has taken a checkpoint, delete one of the TaskManagers (TM_1).  Kubernetes will spawn a new TaskManager (TM_3) which will successfully connect to the JobManager.

 

Actual Behavior:

The running job will be canceled and redeployed but will be caught in a SCHEDULED state (shows as CREATED in the web UI).  JobManager will repeatively attempt to request slots from the ResourceManager.  The tasks in the job will never resume.

 

Expected Behavior:

Job should be fully unscheduled from TM_2.  TM_2 and TM_3 should pick up the job.  The job should successfully resume from the last checkpoint.

 

Known Workarounds:

1) Cancel and resubmit the job.

2) Using the above example, have a free TaskManager (TM_4) that also has 16 available slots.

 

Log snip:

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -> Flat Map -> Sink: Unnamed (29/32) (6078b9c76953c7c27b05b522880d3d1b) switched from CANCELING to CANCELED.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) if no longer possible.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state FAILING to RESTARTING.

2019-01-10 19:42:50,299 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb).

2019-01-10 19:42:50,302 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state RESTARTING to CREATED.

2019-01-10 19:42:50,302 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.

2019-01-10 19:42:50,308 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 1.

2019-01-10 19:42:50,386 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job c44a91b76ea99ead6fdf9b13a98c15bb from latest valid checkpoint: Checkpoint 1 @ 1547149215694 for c44a91b76ea99ead6fdf9b13a98c15bb.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state to restore

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state CREATED to RUNNING.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Kafka Consumer (1/1) (49b728769a3a2b3a3a6ba45cd4445e3b) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Bundle Kafka Consumer (1/1) (1220cf4b9f5eb937191bb2232a482899) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) (29f69ee8fbc208cd7c63e99907d11386) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Ticket Update Kafka Consumer (1/1) (4bdfbcb7280fb7a7c9ea2d5aa02efa41) switched from CREATED to SCHEDULED.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Requesting new slot [SlotRequestId{83bcd1c29b885a7799bf6e5d73d1961c}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.

2019-01-10 19:42:50,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Purchase Order Product Kafka Consumer (1/1) (9bfdbe9141c8b6715b890a35b026bb3b) switched from CREATED to SCHEDULED.

 

...snip...

 

2019-01-10 19:42:50,471 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -> Flat Map -> Sink: Unnamed (32/32) (cd851b29b5533f8a3d4812ac7e8d47ab) switched from CREATED to SCHEDULED.

2019-01-10 19:42:52,733 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 6c84753ff746978c90a2e6d7627e6263 (akka.tcp://flink@flink-taskmanager-5748955d9-4wmj9:6126/user/taskmanager_0) at ResourceManager

2019-01-10 19:42:52,736 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 6c84753ff746978c90a2e6d7627e6263 (akka.tcp://flink@flink-taskmanager-5748955d9-4wmj9:6126/user/taskmanager_0) at ResourceManager

2019-01-10 19:42:55,572 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@flink-taskmanager-5748955d9-vgtl4:6126] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flink-taskmanager-5748955d9-vgtl4:6126]] Caused by: [flink-taskmanager-5748955d9-vgtl4: Name or service not known]

2019-01-10 19:44:20,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) of job c44a91b76ea99ead6fdf9b13a98c15bb is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

2019-01-10 19:45:50,388 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) of job c44a91b76ea99ead6fdf9b13a98c15bb is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

 

Many thanks,

 

John Stone