Hello everyone,
I have a job which is writing some streams into parquet files in S3. I use Flink 1.7.2 on EMR 5.21. My job had been running well, but suddenly it failed to make a checkpoint with the full stack trace mentioned below. After that failure, the job restarted from the last successful checkpoint, but it could not make any further checkpoint - all subsequent checkpoints failed with the same reason. Searching on Internet I could only find one explanation: S3Object has not been closed properly. Could someone please help? Thanks and regards, Averell /The program finished with the following exception: org.apache.flink.util.FlinkException: Triggering a savepoint for the job 8cf68432acb3b4ced2cbc97bc23b4af5 failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint failed: Could not perform checkpoint 33 for operator Sink: S3 - Instant (1/4). at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint failed: Could not perform checkpoint 33 for operator Sink: S3 - Instant (1/4). at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918) at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779) at org.apache.flink.runtime.executiongraph.ExecutionVertex.notifyStateTransition(ExecutionVertex.java:756) at org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1353) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1113) at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:945) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1576) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542) at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) ... 15 more Caused by: java.lang.Exception: Checkpoint failed: Could not perform checkpoint 33 for operator Sink: S3 - Instant (1/4). ... 30 more Caused by: java.lang.Exception: Could not perform checkpoint 33 for operator Sink: S3 - Instant (1/4). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not complete snapshot 33 for operator Sink: S3 - Instant (1/4). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586) ... 8 more Caused by: java.io.IOException: Uploading parts failed at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56) at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167) at org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:244) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:235) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:347) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395) ... 13 more Caused by: java.io.InterruptedIOException: upload part on output/instant/dt=2019-01-09/part-0-24: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318) at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ... 12 more Caused by: org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool at org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292) at org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269) at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.$Proxy29.get(Unknown Source) at org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191) at org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) at org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) ... 25 more / -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, Did you have other failures before (from which you managed to resume successfully)? Can you share a bit more details about your job and potentially the TM/JM logs? The only thing I found about this is here https://forums.aws.amazon.com/thread.jspa?threadID=130172 but Flink does not directly call getObject(). We rely on Hadoop code to read objects. In addition, we do not interact directly with the connection pool. Cheers, Kostas On Mon, Mar 4, 2019 at 1:53 PM Averell <[hidden email]> wrote: Hello everyone, |
Hello Kostas,
Thanks for your time. I started that job from fresh, set checkpoint interval to 15 minutes. It completed the first 13 checkpoints successfully, only started failing from the 14th. I waited for about 20 more checkpoints, but all failed. Then I cancelled the job, restored from the last successful checkpoint, and there were no more issues. Today, I had another try - restoring from the last successful checkpoint from yesterday. Result: started getting the same error from the first checkpoint after restore. Tried to cancel and restore again, then no more issue until now (35 more checkpoints already). Regarding my job: I have 6 different S3-file-source streams connected/unioned together, and then connected to a 7th S3-file-source broadcast stream. Sinks are S3 parquet files and Elasticsearch. Checkpointing is incremental and uses RocksDB. This broadcast stream is one of the new changes to my job. The previous version with 4 out of those 6 sources has been running well for more than a month without any issue. TM/JM logs for the first run today (the failure one) are attached. The Yarn/EMR cluster is dedicated to the job. I have a feeling that the issue comes from that broadcast stream (as mentioned in the document, it doesn't use RocksDB). But not quite sure. Thanks and regards, Averell logs.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/logs.gz> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Kostas Kloudas-2
Hi Kostas, and everyone,
Just some update to my issue: I have tried to: * changed s3 related configuration in hadoop as suggested by hadoop document [1]: increased /fs.s3a.threads.max/ from 10 to 100, and /fs.s3a.connection.maximum/ from 15 to 120. For reference, I am having only 3 S3 sinks, with parallelisms of 4, 4, and 1. * followed AWS's document [2] to increase their EMRFS maxConnections to 200. However, I doubt that this would make any difference, as in creating the S3 parquet bucket sink, I needed to use "s3a://..." path. "s3://..." seems not supported by Flink yet. * reduced the parallelism for my S3 continuous files reader. However, the problem still randomly occurred (random by job executions. When it occurred, the only solution is to cancel the job and restart from the last successful checkpoint). Thanks and regards, Averell [1] Hadoop-AWS module: Integration with Amazon Web Services <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#aTimeout_waiting_for_connection_from_pool_when_writing_to_S3A> [2] emr-timeout-connection-wait <https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas and everyone,
I tried to change setFailOnCheckpointingErrors from True to False, and got the following trace in Flink GUI when the checkpoint/uploading failed. Not sure whether it would be of any help in identifying the issue. BTW, could you please help tell where to find the log file that Flink GUI's Exception tab is reading from? Thanks and regards, Averell java.lang.ArrayIndexOutOfBoundsException: 122626 at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainLongDictionaryValuesWriter.fallBackDictionaryEncodedData(DictionaryValuesWriter.java:397) at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.fallBackAllValuesTo(DictionaryValuesWriter.java:130) at org.apache.parquet.column.values.fallback.FallbackValuesWriter.fallBack(FallbackValuesWriter.java:153) at org.apache.parquet.column.values.fallback.FallbackValuesWriter.checkFallback(FallbackValuesWriter.java:147) at org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeLong(FallbackValuesWriter.java:181) at org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:228) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addLong(MessageColumnIO.java:449) at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:327) at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278) at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191) at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165) at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128) at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299) at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |