Network issue leading to "No pooled slot available"

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Network issue leading to "No pooled slot available"

Dan Diephouse
I am now using the S3 StreamingFileSink to send data to an S3 bucket. If/when the network connection has issues, it seems to put Flink into an irrecoverable state. Am I understanding this correctly? Any suggestions on how to troubleshoot / fix?

Here is what I'm observing:

1. Network is dropped 

2. S3 connections do not exit gracefully

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).] o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

3. Tasks do not complete

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).] o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).] o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

4. When trying to restart, there are no slots

2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46] o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from tag reads -> Sink: Vehicle Event Sink (2/12) (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not deployed.

java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152) ~[na:na]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 20 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 27 common frames omitted

Any thoughts / suggestions are much appreciated.

--
Dan Diephouse
@dandiep
Reply | Threaded
Open this post in threaded view
|

Re: Network issue leading to "No pooled slot available"

r_khachatryan
Hi Dan Diephouse,

From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where 2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3 client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <[hidden email]> wrote:
I am now using the S3 StreamingFileSink to send data to an S3 bucket. If/when the network connection has issues, it seems to put Flink into an irrecoverable state. Am I understanding this correctly? Any suggestions on how to troubleshoot / fix?

Here is what I'm observing:

1. Network is dropped 

2. S3 connections do not exit gracefully

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).] o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

3. Tasks do not complete

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).] o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).] o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

4. When trying to restart, there are no slots

2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46] o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from tag reads -> Sink: Vehicle Event Sink (2/12) (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not deployed.

java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152) ~[na:na]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 20 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 27 common frames omitted

Any thoughts / suggestions are much appreciated.

--
Dan Diephouse
@dandiep
Reply | Threaded
Open this post in threaded view
|

Re: Network issue leading to "No pooled slot available"

Dan Diephouse
Using the latest - 1.11.2.

I would assume the interruption is being ignored in the Hadoop / S3 layer. I was looking at the defaults and (if I understood correctly) the client will retry 20 times. Which would explain why it never gets cancelled...

On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <[hidden email]> wrote:
Hi Dan Diephouse,

From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where 2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3 client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <[hidden email]> wrote:
I am now using the S3 StreamingFileSink to send data to an S3 bucket. If/when the network connection has issues, it seems to put Flink into an irrecoverable state. Am I understanding this correctly? Any suggestions on how to troubleshoot / fix?

Here is what I'm observing:

1. Network is dropped 

2. S3 connections do not exit gracefully

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).] o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

3. Tasks do not complete

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).] o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).] o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

4. When trying to restart, there are no slots

2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46] o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from tag reads -> Sink: Vehicle Event Sink (2/12) (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not deployed.

java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152) ~[na:na]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 20 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 27 common frames omitted

Any thoughts / suggestions are much appreciated.

--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep
Reply | Threaded
Open this post in threaded view
|

Re: Network issue leading to "No pooled slot available"

Dan Diephouse
Did some digging... definitely appears that the Amazon SDK definitely is not picking up the interrupt.  I will try playing with the connection timeout. Hadoop defaults it to 200000 ms, which may be part of the problem. Anyone have any other ideas?

In theory this should be fixed by SDK v2 which uses NIO, but I don't think I'm up for all the changes that would involve in the downstream components.

On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <[hidden email]> wrote:
Using the latest - 1.11.2.

I would assume the interruption is being ignored in the Hadoop / S3 layer. I was looking at the defaults and (if I understood correctly) the client will retry 20 times. Which would explain why it never gets cancelled...

On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <[hidden email]> wrote:
Hi Dan Diephouse,

From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where 2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3 client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <[hidden email]> wrote:
I am now using the S3 StreamingFileSink to send data to an S3 bucket. If/when the network connection has issues, it seems to put Flink into an irrecoverable state. Am I understanding this correctly? Any suggestions on how to troubleshoot / fix?

Here is what I'm observing:

1. Network is dropped 

2. S3 connections do not exit gracefully

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).] o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

3. Tasks do not complete

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).] o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).] o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

4. When trying to restart, there are no slots

2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46] o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from tag reads -> Sink: Vehicle Event Sink (2/12) (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not deployed.

java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152) ~[na:na]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 20 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 27 common frames omitted

Any thoughts / suggestions are much appreciated.

--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep
Reply | Threaded
Open this post in threaded view
|

Re: Network issue leading to "No pooled slot available"

Dan Diephouse
Well, I just dropped in the latest Amazon 1.11.878 SDK and now it appears to respect interrupts in a test case I created. (the test fails with the SDK that is in use by Flink)

I will try it in a full fledged Flink environment and report back.

On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse <[hidden email]> wrote:
Did some digging... definitely appears that the Amazon SDK definitely is not picking up the interrupt.  I will try playing with the connection timeout. Hadoop defaults it to 200000 ms, which may be part of the problem. Anyone have any other ideas?

In theory this should be fixed by SDK v2 which uses NIO, but I don't think I'm up for all the changes that would involve in the downstream components.

On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <[hidden email]> wrote:
Using the latest - 1.11.2.

I would assume the interruption is being ignored in the Hadoop / S3 layer. I was looking at the defaults and (if I understood correctly) the client will retry 20 times. Which would explain why it never gets cancelled...

On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <[hidden email]> wrote:
Hi Dan Diephouse,

From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where 2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3 client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <[hidden email]> wrote:
I am now using the S3 StreamingFileSink to send data to an S3 bucket. If/when the network connection has issues, it seems to put Flink into an irrecoverable state. Am I understanding this correctly? Any suggestions on how to troubleshoot / fix?

Here is what I'm observing:

1. Network is dropped 

2. S3 connections do not exit gracefully

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).] o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

3. Tasks do not complete

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).] o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).] o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

4. When trying to restart, there are no slots

2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46] o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from tag reads -> Sink: Vehicle Event Sink (2/12) (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not deployed.

java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152) ~[na:na]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 20 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 27 common frames omitted

Any thoughts / suggestions are much appreciated.

--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep
Reply | Threaded
Open this post in threaded view
|

Re: Network issue leading to "No pooled slot available"

r_khachatryan
Thanks for checking this workaround!

I've created a jira issue [1] to check if AWS SDK version can be upgraded in Flink distribution.

Regards,
Roman


On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse <[hidden email]> wrote:
Well, I just dropped in the latest Amazon 1.11.878 SDK and now it appears to respect interrupts in a test case I created. (the test fails with the SDK that is in use by Flink)

I will try it in a full fledged Flink environment and report back.

On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse <[hidden email]> wrote:
Did some digging... definitely appears that the Amazon SDK definitely is not picking up the interrupt.  I will try playing with the connection timeout. Hadoop defaults it to 200000 ms, which may be part of the problem. Anyone have any other ideas?

In theory this should be fixed by SDK v2 which uses NIO, but I don't think I'm up for all the changes that would involve in the downstream components.

On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <[hidden email]> wrote:
Using the latest - 1.11.2.

I would assume the interruption is being ignored in the Hadoop / S3 layer. I was looking at the defaults and (if I understood correctly) the client will retry 20 times. Which would explain why it never gets cancelled...

On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <[hidden email]> wrote:
Hi Dan Diephouse,

From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where 2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3 client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <[hidden email]> wrote:
I am now using the S3 StreamingFileSink to send data to an S3 bucket. If/when the network connection has issues, it seems to put Flink into an irrecoverable state. Am I understanding this correctly? Any suggestions on how to troubleshoot / fix?

Here is what I'm observing:

1. Network is dropped 

2. S3 connections do not exit gracefully

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).] o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

3. Tasks do not complete

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).] o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).] o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

4. When trying to restart, there are no slots

2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46] o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from tag reads -> Sink: Vehicle Event Sink (2/12) (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not deployed.

java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152) ~[na:na]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 20 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 27 common frames omitted

Any thoughts / suggestions are much appreciated.

--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep
Reply | Threaded
Open this post in threaded view
|

Re: Network issue leading to "No pooled slot available"

Dan Diephouse
Quick update: it appears to work outside my test case too. I have not encountered this issue post update at all. 

On Thu, Oct 8, 2020 at 11:15 PM Khachatryan Roman <[hidden email]> wrote:
Thanks for checking this workaround!

I've created a jira issue [1] to check if AWS SDK version can be upgraded in Flink distribution.

Regards,
Roman


On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse <[hidden email]> wrote:
Well, I just dropped in the latest Amazon 1.11.878 SDK and now it appears to respect interrupts in a test case I created. (the test fails with the SDK that is in use by Flink)

I will try it in a full fledged Flink environment and report back.

On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse <[hidden email]> wrote:
Did some digging... definitely appears that the Amazon SDK definitely is not picking up the interrupt.  I will try playing with the connection timeout. Hadoop defaults it to 200000 ms, which may be part of the problem. Anyone have any other ideas?

In theory this should be fixed by SDK v2 which uses NIO, but I don't think I'm up for all the changes that would involve in the downstream components.

On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <[hidden email]> wrote:
Using the latest - 1.11.2.

I would assume the interruption is being ignored in the Hadoop / S3 layer. I was looking at the defaults and (if I understood correctly) the client will retry 20 times. Which would explain why it never gets cancelled...

On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <[hidden email]> wrote:
Hi Dan Diephouse,

From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where 2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3 client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <[hidden email]> wrote:
I am now using the S3 StreamingFileSink to send data to an S3 bucket. If/when the network connection has issues, it seems to put Flink into an irrecoverable state. Am I understanding this correctly? Any suggestions on how to troubleshoot / fix?

Here is what I'm observing:

1. Network is dropped 

2. S3 connections do not exit gracefully

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).] o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

3. Tasks do not complete

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).] o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).] o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

4. When trying to restart, there are no slots

2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46] o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from tag reads -> Sink: Vehicle Event Sink (2/12) (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not deployed.

java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152) ~[na:na]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 20 common frames omitted
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) ~[flink-core-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 27 common frames omitted

Any thoughts / suggestions are much appreciated.

--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep


--
Dan Diephouse
@dandiep