We have Apache Flink (1.4.2) running on an EMR cluster. We are checkpointing to an S3 bucket, and are pushing about 5,000 records per second through the flows. We recently saw the following
error in our logs: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
Ask timed out on [Actor[akka.tcp://flink@ip-XXX-XXX-XXX-XXX:XXXXXX/user/taskmanager#-XXXXXXX]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.TaskManagerMessages$RequestTaskManagerLog". at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:442) at akka.dispatch.OnComplete.internal(Future.scala:258) at akka.dispatch.OnComplete.internal(Future.scala:256) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
Immediately after this we got the following in our logs: 2018-07-30 15:08:32,177 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 831 @ 1532963312177 2018-07-30 15:09:46,750 ERROR org.apache.flink.runtime.blob.BlobServerConnection
- PUT operation failed java.io.EOFException: Read an incomplete length at org.apache.flink.runtime.blob.BlobUtils.readLength(BlobUtils.java:366) at org.apache.flink.runtime.blob.BlobServerConnection.readFileFully(BlobServerConnection.java:403) at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:349) at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:114)
At this point, the flow crashed and was not able to automatically recover, however we were able to restart the flow manually, without needing to change the location of the s3 bucket.
The fact that the crash occurred while pushing to S3, makes me think that is the crux of the problem.
Any ideas? Thanks, Rafael |
Free forum by Nabble | Edit this page |