Recovery problem 2 of 2 in Flink 1.6.3

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Recovery problem 2 of 2 in Flink 1.6.3

John Stone-2

This is the second of two recovery problems I'm seeing running Flink in Kubernetes.  I'm posting them in separate messages for brevity and because the second is not directly related to the first.  Any advice is appreciated.  First problem: https://lists.apache.org/thread.html/a663a8ce2f697e6d207cb59eff1f77dbb8bd745e3f44aab09866ab46@%3Cuser.flink.apache.org%3E

 

Setup:

Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One JobManager and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each TaskManager has 16 task slots.  High availability is enabled.  S3 (s3a) for storage.  RocksDB with incremental snapshots.  It doesn't matter if local recover is enabled - I've managed to replicate with both local recovery enabled and disabled.  The value of "fs.s3a.connection.maximum" is 128.

 

Problem:

Flink + Hadoop does not either re-use existing connections to S3 or kill existing connections and create new ones when a job dies.

 

Replication Steps:

Create a job with a parallelism of 16 - all processing is occurring on TM_1.  After a checkpoint has been taken, delete TM_1.  Job is canceled on TM_1, deployed and restored sucessfully on TM_2, and a new TaskManager (TM_3) is created and successfully registers with the JobManager.  No work is scheduled on TM_3.  After another checkpoint is taken, delete TM_2.  The job is canceled on TM_2, and attempts to be deployed TM_3 but fails with "org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool".  Flink attempts to recover by canceling on TM_3 and deploying on TM_4, but Flink does not does not release the task slots on TM_3 (TM_3 now has no free task slots).  The job is deployed to TM_4 which again fails with "ConnectionPoolTimeoutException: Timeout waiting for connection from pool".  Flink attempts to recover by canceling on TM_4, but does not release the task slots on TM_4 (TM_4 now has no free task slots).  As there are 0 available slots, the job is now caught in a SCHEDULED state.

 

Actual Behavior:

Shaded Hadoop does not release hold on S3 connections when job dies.

 

Expected Behavior:

Hadoop should be told to release connections when job dies, or should re-use existing connections.

 

Log Snip:

2019-01-10 20:03:40,191 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map (8/16) (aaa18fa82aa555a51474d49ac14665e7) switched from RUNNING to FAILED.

java.io.InterruptedIOException: getFileStatus on s3a://my-s3-bucket/stream-cluster/prod/checkpoints/83d7cb3e6d08318ef2c27878d0fe1bbd: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1571)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:1507)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:1482)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1913)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)

    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)

    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:83)

    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)

    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:443)

    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:399)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

    at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1038)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4141)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1256)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1232)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)

    ... 12 more

Caused by: org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)

    at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.conn.$Proxy10.get(Unknown Source)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)

   at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)

   at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1181)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)

    ... 24 more

2019-01-10 20:03:40,192 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (83d7cb3e6d08318ef2c27878d0fe1bbd) switched from state RUNNING to FAILING.

 

Many thanks,

 

John Stone

 

Reply | Threaded
Open this post in threaded view
|

Re: Recovery problem 2 of 2 in Flink 1.6.3

Till Rohrmann
Hi John,

this looks indeed strange. How many concurrent operators do you have which write state to s3?

After the cancellation, the JobManager should keep the slots for some time until they are freed. This is the normal behaviour and can be controlled with `slot.idle.timeout`. Could you maybe share the complete logs on DEBUG log level to fully understand the problem? A thread dump of the TM process would also be helpful to see whether there are any blocking operations which keep the HTTP connections open.

Cheers,
Till

On Thu, Jan 10, 2019 at 9:35 PM John Stone <[hidden email]> wrote:

This is the second of two recovery problems I'm seeing running Flink in Kubernetes.  I'm posting them in separate messages for brevity and because the second is not directly related to the first.  Any advice is appreciated.  First problem: https://lists.apache.org/thread.html/a663a8ce2f697e6d207cb59eff1f77dbb8bd745e3f44aab09866ab46@%3Cuser.flink.apache.org%3E

 

Setup:

Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One JobManager and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each TaskManager has 16 task slots.  High availability is enabled.  S3 (s3a) for storage.  RocksDB with incremental snapshots.  It doesn't matter if local recover is enabled - I've managed to replicate with both local recovery enabled and disabled.  The value of "fs.s3a.connection.maximum" is 128.

 

Problem:

Flink + Hadoop does not either re-use existing connections to S3 or kill existing connections and create new ones when a job dies.

 

Replication Steps:

Create a job with a parallelism of 16 - all processing is occurring on TM_1.  After a checkpoint has been taken, delete TM_1.  Job is canceled on TM_1, deployed and restored sucessfully on TM_2, and a new TaskManager (TM_3) is created and successfully registers with the JobManager.  No work is scheduled on TM_3.  After another checkpoint is taken, delete TM_2.  The job is canceled on TM_2, and attempts to be deployed TM_3 but fails with "org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool".  Flink attempts to recover by canceling on TM_3 and deploying on TM_4, but Flink does not does not release the task slots on TM_3 (TM_3 now has no free task slots).  The job is deployed to TM_4 which again fails with "ConnectionPoolTimeoutException: Timeout waiting for connection from pool".  Flink attempts to recover by canceling on TM_4, but does not release the task slots on TM_4 (TM_4 now has no free task slots).  As there are 0 available slots, the job is now caught in a SCHEDULED state.

 

Actual Behavior:

Shaded Hadoop does not release hold on S3 connections when job dies.

 

Expected Behavior:

Hadoop should be told to release connections when job dies, or should re-use existing connections.

 

Log Snip:

2019-01-10 20:03:40,191 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map (8/16) (aaa18fa82aa555a51474d49ac14665e7) switched from RUNNING to FAILED.

java.io.InterruptedIOException: getFileStatus on s3a://my-s3-bucket/stream-cluster/prod/checkpoints/83d7cb3e6d08318ef2c27878d0fe1bbd: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1571)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:1507)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:1482)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1913)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)

    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)

    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:83)

    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)

    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:443)

    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:399)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

    at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1038)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4141)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1256)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1232)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)

    ... 12 more

Caused by: org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)

    at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.conn.$Proxy10.get(Unknown Source)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)

   at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)

    at org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)

   at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1181)

    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)

    ... 24 more

2019-01-10 20:03:40,192 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streamProcessorJob (83d7cb3e6d08318ef2c27878d0fe1bbd) switched from state RUNNING to FAILING.

 

Many thanks,

 

John Stone