https://issues.apache.org/jira/browse/FLINK-7590
I have a similar situation with Flink 1.3.2 on K8S
=========
2017-12-13 00:57:12,403 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell( maxwell.tickets) -> FixedDelayWatermark(maxwell. tickets) -> MaxwellFPSEvent->InfluxDBData( maxwell.tickets) -> Sink: influxdbSink(maxwell.tickets) (1/3) ( 6ad009755a6009975d197e75afa05e 14) switched from RUNNING to FAILED. AsynchronousException{java. lang.Exception: Could not materialize checkpoint 803 for operator Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell( maxwell.tickets) -> FixedDelayWatermark(maxwell. tickets) -> MaxwellFPSEvent->InfluxDBData( maxwell.tickets) -> Sink: influxdbSink(maxwell.tickets) (1/3).} at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable.run( StreamTask.java:970) at java.util.concurrent. Executors$RunnableAdapter. call(Executors.java:511) at java.util.concurrent. FutureTask.run(FutureTask. java:266) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1149) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread. java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 803 for operator Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell( maxwell.tickets) -> FixedDelayWatermark(maxwell. tickets) -> MaxwellFPSEvent->InfluxDBData( maxwell.tickets) -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by: java.util.concurrent. ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3a://zendesk-euc1-fraud- prevention-production/ checkpoints/ d5a8b2ab61625cf0aa1e66360b7ad0 af/chk-803/4f485204-3ec5-402a- a57d-fab13e068cbc in order to obtain the stream state handle at java.util.concurrent. FutureTask.report(FutureTask. java:122) at java.util.concurrent. FutureTask.get(FutureTask. java:192) at org.apache.flink.util. FutureUtil.runIfNotDoneAndGet( FutureUtil.java:43) at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable.run( StreamTask.java:906) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed operator state future. at org.apache.flink.streaming. api.operators. OperatorSnapshotResult.cancel( OperatorSnapshotResult.java: 98) at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable. cleanup(StreamTask.java:1023) at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable.run( StreamTask.java:961) ... 5 more Caused by: java.util.concurrent. ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3a://zendesk-euc1-fraud- prevention-production/ checkpoints/ d5a8b2ab61625cf0aa1e66360b7ad0 af/chk-803/4f485204-3ec5-402a- a57d-fab13e068cbc in order to obtain the stream state handle at java.util.concurrent. FutureTask.report(FutureTask. java:122) at java.util.concurrent. FutureTask.get(FutureTask. java:192) at org.apache.flink.util. FutureUtil.runIfNotDoneAndGet( FutureUtil.java:43) at org.apache.flink.runtime. state.StateUtil. discardStateFuture(StateUtil. java:85) at org.apache.flink.streaming. api.operators. OperatorSnapshotResult.cancel( OperatorSnapshotResult.java: 96) ... 7 more Caused by: java.io.IOException: Could not flush and close the file system output stream to s3a://zendesk-euc1-fraud- prevention-production/ checkpoints/ d5a8b2ab61625cf0aa1e66360b7ad0 af/chk-803/4f485204-3ec5-402a- a57d-fab13e068cbc in order to obtain the stream state handle at org.apache.flink.runtime. state.filesystem. FsCheckpointStreamFactory$ FsCheckpointStateOutputStream. closeAndGetHandle( FsCheckpointStreamFactory. java:336) at org.apache.flink.runtime. checkpoint. AbstractAsyncSnapshotIOCallabl e. closeStreamAndGetStateHandle( AbstractAsyncSnapshotIOCallabl e.java:100) at org.apache.flink.runtime. state. DefaultOperatorStateBackend$1. performOperation( DefaultOperatorStateBackend. java:270) at org.apache.flink.runtime. state. DefaultOperatorStateBackend$1. performOperation( DefaultOperatorStateBackend. java:233) at org.apache.flink.runtime.io. async.AbstractAsyncIOCallable. call(AbstractAsyncIOCallable. java:72) at java.util.concurrent. FutureTask.run(FutureTask. java:266) at org.apache.flink.util. FutureUtil.runIfNotDoneAndGet( FutureUtil.java:40) at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable.run( StreamTask.java:906) ... 5 more Caused by: com.amazonaws.services.s3. model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code: RequestTimeout, AWS Error Message: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2 cBkumOK5EX4TYgt+ LVErSOShxPkZmGrCvmT39FHDbIryc= at com.amazonaws.http. AmazonHttpClient. handleErrorResponse( AmazonHttpClient.java:798) at com.amazonaws.http. AmazonHttpClient. executeHelper( AmazonHttpClient.java:421) at com.amazonaws.http. AmazonHttpClient.execute( AmazonHttpClient.java:232) at com.amazonaws.services.s3. AmazonS3Client.invoke( AmazonS3Client.java:3528) at com.amazonaws.services.s3. AmazonS3Client.putObject( AmazonS3Client.java:1393) at com.amazonaws.services.s3. transfer.internal. UploadCallable. uploadInOneChunk( UploadCallable.java:108) at com.amazonaws.services.s3. transfer.internal. UploadCallable.call( UploadCallable.java:100) at com.amazonaws.services.s3. transfer.internal. UploadMonitor.upload( UploadMonitor.java:192) at com.amazonaws.services.s3. transfer.internal. UploadMonitor.call( UploadMonitor.java:150) at com.amazonaws.services.s3. transfer.internal. UploadMonitor.call( UploadMonitor.java:50) ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and close the file system output stream to s3a://zendesk-euc1-fraud- prevention-production/ checkpoints/ d5a8b2ab61625cf0aa1e66360b7ad0 af/chk-803/4f485204-3ec5-402a- a57d-fab13e068cbc in order to obtain the stream state handle] 2017-12-13 00:57:12,404 INFO org.apache.flink.runtime. executiongraph.ExecutionGraph - Job KafkaDemo maxwell.tickets (env:production) ( d5a8b2ab61625cf0aa1e66360b7ad0 af) switched from state RUNNING to FAILING. AsynchronousException{java. lang.Exception: Could not materialize checkpoint 803 for operator Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell( maxwell.tickets) -> FixedDelayWatermark(maxwell. tickets) -> MaxwellFPSEvent->InfluxDBData( maxwell.tickets) -> Sink: influxdbSink(maxwell.tickets) (1/3).} at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable.run( StreamTask.java:970) at java.util.concurrent. Executors$RunnableAdapter. call(Executors.java:511) at java.util.concurrent. FutureTask.run(FutureTask. java:266) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1149) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread. java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 803 for operator Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell( maxwell.tickets) -> FixedDelayWatermark(maxwell. tickets) -> MaxwellFPSEvent->InfluxDBData( maxwell.tickets) -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by: java.util.concurrent. ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3a://zendesk-euc1-fraud- prevention-production/ checkpoints/ d5a8b2ab61625cf0aa1e66360b7ad0 af/chk-803/4f485204-3ec5-402a- a57d-fab13e068cbc in order to obtain the stream state handle at java.util.concurrent. FutureTask.report(FutureTask. java:122) at java.util.concurrent. FutureTask.get(FutureTask. java:192) at org.apache.flink.util. FutureUtil.runIfNotDoneAndGet( FutureUtil.java:43) at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable.run( StreamTask.java:906) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed operator state future. at org.apache.flink.streaming. api.operators. OperatorSnapshotResult.cancel( OperatorSnapshotResult.java: 98) at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable. cleanup(StreamTask.java:1023) at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable.run( StreamTask.java:961) ... 5 more Caused by: java.util.concurrent. ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3a://zendesk-euc1-fraud- prevention-production/ checkpoints/ d5a8b2ab61625cf0aa1e66360b7ad0 af/chk-803/4f485204-3ec5-402a- a57d-fab13e068cbc in order to obtain the stream state handle at java.util.concurrent. FutureTask.report(FutureTask. java:122) at java.util.concurrent. FutureTask.get(FutureTask. java:192) at org.apache.flink.util. FutureUtil.runIfNotDoneAndGet( FutureUtil.java:43) at org.apache.flink.runtime. state.StateUtil. discardStateFuture(StateUtil. java:85) at org.apache.flink.streaming. api.operators. OperatorSnapshotResult.cancel( OperatorSnapshotResult.java: 96) ... 7 more Caused by: java.io.IOException: Could not flush and close the file system output stream to s3a://zendesk-euc1-fraud- prevention-production/ checkpoints/ d5a8b2ab61625cf0aa1e66360b7ad0 af/chk-803/4f485204-3ec5-402a- a57d-fab13e068cbc in order to obtain the stream state handle at org.apache.flink.runtime. state.filesystem. FsCheckpointStreamFactory$ FsCheckpointStateOutputStream. closeAndGetHandle( FsCheckpointStreamFactory. java:336) at org.apache.flink.runtime. checkpoint. AbstractAsyncSnapshotIOCallabl e. closeStreamAndGetStateHandle( AbstractAsyncSnapshotIOCallabl e.java:100) at org.apache.flink.runtime. state. DefaultOperatorStateBackend$1. performOperation( DefaultOperatorStateBackend. java:270) at org.apache.flink.runtime. state. DefaultOperatorStateBackend$1. performOperation( DefaultOperatorStateBackend. java:233) at org.apache.flink.runtime.io. async.AbstractAsyncIOCallable. call(AbstractAsyncIOCallable. java:72) at java.util.concurrent. FutureTask.run(FutureTask. java:266) at org.apache.flink.util. FutureUtil.runIfNotDoneAndGet( FutureUtil.java:40) at org.apache.flink.streaming. runtime.tasks.StreamTask$ AsyncCheckpointRunnable.run( StreamTask.java:906) ... 5 more Caused by: com.amazonaws.services.s3. model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code: RequestTimeout, AWS Error Message: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2 cBkumOK5EX4TYgt+ LVErSOShxPkZmGrCvmT39FHDbIryc= at com.amazonaws.http.