Apache-flink -- Checkpointing to S3 Bucket

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Apache-flink -- Checkpointing to S3 Bucket

Chargel, Rafael

We have Apache Flink (1.4.2) running on an EMR cluster. We are checkpointing to an S3 bucket, and are pushing about 5,000 records per second through the flows. We recently saw the following error in our logs:

java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@ip-XXX-XXX-XXX-XXX:XXXXXX/user/taskmanager#-XXXXXXX]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.TaskManagerMessages$RequestTaskManagerLog".

  at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)

  at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)

  at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)

  at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)

  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

  at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

  at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:442)

  at akka.dispatch.OnComplete.internal(Future.scala:258)

  at akka.dispatch.OnComplete.internal(Future.scala:256)

  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)

  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)

  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

  at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)

Immediately after this we got the following in our logs:

2018-07-30 15:08:32,177 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 831 @ 1532963312177

2018-07-30 15:09:46,750 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - PUT operation failed

java.io.EOFException: Read an incomplete length

  at org.apache.flink.runtime.blob.BlobUtils.readLength(BlobUtils.java:366)

  at org.apache.flink.runtime.blob.BlobServerConnection.readFileFully(BlobServerConnection.java:403)

  at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:349)

  at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:114)

At this point, the flow crashed and was not able to automatically recover, however we were able to restart the flow manually, without needing to change the location of the s3 bucket. The fact that the crash occurred while pushing to S3, makes me think that is the crux of the problem.

Any ideas?

Thanks,

Rafael

PS: I posted this to StackOverflow as well, and have had no responses: https://stackoverflow.com/questions/51597785/apache-flink-error-checkpointing-to-s3