remote task manager netty exception

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

remote task manager netty exception

occupiedid
Hi,

We are experiencing some netty issue with our Flink cluster, which we couldn't figure the cause.

Below is the stack trace of exceptions from TM's and JM's perspectives.  we have 85 TMs and one JM in HA mode. The strange thing is that only 23 of the TM are complaining about the connection issue. When this exception occurs, the TM they are complaining about is still up and live. this will cause our job to be stuck in the restart loop for a couple of hours then back to normal. 

We are using HDFS as the state backend and the checkpoint dir.
the application is running in our own data center and in Kubernetes as a standalone job.


## Job Graph

the job graph is like this.
source 1.1 (5 parallelism).  ->
                                                  union ->
source 1.2 (80 parallelism) ->
                                                                    connect -> sink            
source 2.1 (5 parallelism).  ->
                                                  union ->
source 2.2 (80 parallelism) ->


## JM's Stacktrace

```
message="PLI Deduplicate Operator (60/80) (5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on 100.98.115.117:6122-924d20 @ 100.98.115.117 (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Sending the partition request to 'null' failed. at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by: java.nio.channels.ClosedChannelException at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more
```

## TM's stacktrace
```
timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate Operator (6/80)#6", class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory", method="connectWithRetries(line:121)", message="Failed 1 times to connect to /100.98.115.117:41245. Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager '/100.98.115.117:41245' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) [?:?]Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more

```
Reply | Threaded
Open this post in threaded view
|

Re: remote task manager netty exception

Roman Khachatryan
Hi,

I see that JM and TM failures are different (from TM, it's actually a
warning). Could you please share the ERROR message from TM?

Have you tried increasing taskmanager.network.retries [1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries

Regards,
Roman

On Fri, Apr 30, 2021 at 11:55 PM Sihan You <[hidden email]> wrote:

>
> Hi,
>
> We are experiencing some netty issue with our Flink cluster, which we couldn't figure the cause.
>
> Below is the stack trace of exceptions from TM's and JM's perspectives.  we have 85 TMs and one JM in HA mode. The strange thing is that only 23 of the TM are complaining about the connection issue. When this exception occurs, the TM they are complaining about is still up and live. this will cause our job to be stuck in the restart loop for a couple of hours then back to normal.
>
> We are using HDFS as the state backend and the checkpoint dir.
> the application is running in our own data center and in Kubernetes as a standalone job.
>
>
> ## Job Graph
>
> the job graph is like this.
> source 1.1 (5 parallelism).  ->
>                                                   union ->
> source 1.2 (80 parallelism) ->
>                                                                     connect -> sink
> source 2.1 (5 parallelism).  ->
>                                                   union ->
> source 2.2 (80 parallelism) ->
>
>
> ## JM's Stacktrace
>
> ```
> message="PLI Deduplicate Operator (60/80) (5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on 100.98.115.117:6122-924d20 @ 100.98.115.117 (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Sending the partition request to 'null' failed. at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by: java.nio.channels.ClosedChannelException at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more
> ```
>
> ## TM's stacktrace
> ```
> timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate Operator (6/80)#6", class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory", method="connectWithRetries(line:121)", message="Failed 1 times to connect to /100.98.115.117:41245. Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager '/100.98.115.117:41245' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) [?:?]Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more
>
> ```
Reply | Threaded
Open this post in threaded view
|

Re: remote task manager netty exception

Yichen Liu
Chime in here since I work with Sihan.

Roman, there isn't much logs beyond this WARN, in fact it should be ERROR since it fail our job and job has to restart.

Here is a fresh new example of "Sending the partition request to 'null' failed." exception. The only log we see before exception was:

timestamp="2021-05-04 14:04:33,014", level="INFO", thread="Latest Billing Info Operator -> (Filter, Filter) (55/80)#12", class="org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend", method="cleanInstanceBasePath(line:462)", message="Closed RocksDB State Backend. Cleaning up RocksDB working directory /tmp/flink-io-9570aace-eec0-4dd9-867f-22a7d367282e/job_00000000000000000000000000000000_op_KeyedProcessOperator_6cf741936dcd5ce8199875ace1f5638a__55_80__uuid_b8ef0675-4355-4b99-9f03-77d1eb713bf4."

timestamp="2021-05-04 14:04:33,633", level="WARN", thread="Latest Billing Info Operator -> (Filter, Filter) (55/80)#12", class="org.apache.flink.runtime.taskmanager.Task", method="transitionState(line:1033)", message="Latest Billing Info Operator -> (Filter, Filter) (55/80)#12 (0f9caefb1122609e3337f2537f7324c3) switched from RUNNING to FAILED." org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Sending the partition request to 'null' failed. at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

but this looks more like a consequence than cause of exception.

Note this seems to be pretty consistent when one of our TMs went lost. Could it be somehow partition info isn't up to date on TM when job is restarting?

Also note that we have a pretty huge state, each TM has around 130GB state, TMs have a setting of 10GB memory and 2700m CPU (in k8s unit).

On Mon, May 3, 2021 at 8:29 AM Roman Khachatryan <[hidden email]> wrote:
Hi,

I see that JM and TM failures are different (from TM, it's actually a
warning). Could you please share the ERROR message from TM?

Have you tried increasing taskmanager.network.retries [1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries

Regards,
Roman

On Fri, Apr 30, 2021 at 11:55 PM Sihan You <[hidden email]> wrote:
>
> Hi,
>
> We are experiencing some netty issue with our Flink cluster, which we couldn't figure the cause.
>
> Below is the stack trace of exceptions from TM's and JM's perspectives.  we have 85 TMs and one JM in HA mode. The strange thing is that only 23 of the TM are complaining about the connection issue. When this exception occurs, the TM they are complaining about is still up and live. this will cause our job to be stuck in the restart loop for a couple of hours then back to normal.
>
> We are using HDFS as the state backend and the checkpoint dir.
> the application is running in our own data center and in Kubernetes as a standalone job.
>
>
> ## Job Graph
>
> the job graph is like this.
> source 1.1 (5 parallelism).  ->
>                                                   union ->
> source 1.2 (80 parallelism) ->
>                                                                     connect -> sink
> source 2.1 (5 parallelism).  ->
>                                                   union ->
> source 2.2 (80 parallelism) ->
>
>
> ## JM's Stacktrace
>
> ```
> message="PLI Deduplicate Operator (60/80) (5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on 100.98.115.117:6122-924d20 @ 100.98.115.117 (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Sending the partition request to 'null' failed. at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by: java.nio.channels.ClosedChannelException at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more
> ```
>
> ## TM's stacktrace
> ```
> timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate Operator (6/80)#6", class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory", method="connectWithRetries(line:121)", message="Failed 1 times to connect to /100.98.115.117:41245. Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager '/100.98.115.117:41245' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) [?:?]Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more
>
> ```


--
YL
Reply | Threaded
Open this post in threaded view
|

Re: remote task manager netty exception

Roman Khachatryan
Hi,

> Could it be somehow partition info isn't up to date on TM when job is restarting?
Partition info should be up to date or become so eventually - but this
is assuming that JM is able to detect the failure.

The latter may not be the case, as Sihan You wrote previously:
> The strange thing is that only 23 of the TM are complaining about the connection issue.
> When this exception occurs, the TM they are complaining about is still up and live.

So it looks like JM to TM network links are OK, but some TM-TM links are down.
Do you have an option to check connection from TM to TM during this
restart loop?

Also it makes sense to check heartbeat interval and timeout [1] so
that JM can detect TM failure quickly enough.

https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-interval

Regards,
Roman


On Tue, May 4, 2021 at 10:17 PM Yichen Liu <[hidden email]> wrote:

>
> Chime in here since I work with Sihan.
>
> Roman, there isn't much logs beyond this WARN, in fact it should be ERROR since it fail our job and job has to restart.
>
> Here is a fresh new example of "Sending the partition request to 'null' failed." exception. The only log we see before exception was:
>
> timestamp="2021-05-04 14:04:33,014", level="INFO", thread="Latest Billing Info Operator -> (Filter, Filter) (55/80)#12", class="org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend", method="cleanInstanceBasePath(line:462)", message="Closed RocksDB State Backend. Cleaning up RocksDB working directory /tmp/flink-io-9570aace-eec0-4dd9-867f-22a7d367282e/job_00000000000000000000000000000000_op_KeyedProcessOperator_6cf741936dcd5ce8199875ace1f5638a__55_80__uuid_b8ef0675-4355-4b99-9f03-77d1eb713bf4."
>
> timestamp="2021-05-04 14:04:33,633", level="WARN", thread="Latest Billing Info Operator -> (Filter, Filter) (55/80)#12", class="org.apache.flink.runtime.taskmanager.Task", method="transitionState(line:1033)", message="Latest Billing Info Operator -> (Filter, Filter) (55/80)#12 (0f9caefb1122609e3337f2537f7324c3) switched from RUNNING to FAILED." org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Sending the partition request to 'null' failed. at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>
> but this looks more like a consequence than cause of exception.
>
> Note this seems to be pretty consistent when one of our TMs went lost. Could it be somehow partition info isn't up to date on TM when job is restarting?
>
> Also note that we have a pretty huge state, each TM has around 130GB state, TMs have a setting of 10GB memory and 2700m CPU (in k8s unit).
>
> On Mon, May 3, 2021 at 8:29 AM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> I see that JM and TM failures are different (from TM, it's actually a
>> warning). Could you please share the ERROR message from TM?
>>
>> Have you tried increasing taskmanager.network.retries [1]?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries
>>
>> Regards,
>> Roman
>>
>> On Fri, Apr 30, 2021 at 11:55 PM Sihan You <[hidden email]> wrote:
>> >
>> > Hi,
>> >
>> > We are experiencing some netty issue with our Flink cluster, which we couldn't figure the cause.
>> >
>> > Below is the stack trace of exceptions from TM's and JM's perspectives.  we have 85 TMs and one JM in HA mode. The strange thing is that only 23 of the TM are complaining about the connection issue. When this exception occurs, the TM they are complaining about is still up and live. this will cause our job to be stuck in the restart loop for a couple of hours then back to normal.
>> >
>> > We are using HDFS as the state backend and the checkpoint dir.
>> > the application is running in our own data center and in Kubernetes as a standalone job.
>> >
>> >
>> > ## Job Graph
>> >
>> > the job graph is like this.
>> > source 1.1 (5 parallelism).  ->
>> >                                                   union ->
>> > source 1.2 (80 parallelism) ->
>> >                                                                     connect -> sink
>> > source 2.1 (5 parallelism).  ->
>> >                                                   union ->
>> > source 2.2 (80 parallelism) ->
>> >
>> >
>> > ## JM's Stacktrace
>> >
>> > ```
>> > message="PLI Deduplicate Operator (60/80) (5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on 100.98.115.117:6122-924d20 @ 100.98.115.117 (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Sending the partition request to 'null' failed. at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by: java.nio.channels.ClosedChannelException at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more
>> > ```
>> >
>> > ## TM's stacktrace
>> > ```
>> > timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate Operator (6/80)#6", class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory", method="connectWithRetries(line:121)", message="Failed 1 times to connect to /100.98.115.117:41245. Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager '/100.98.115.117:41245' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) [?:?]Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more
>> >
>> > ```
>
>
>
> --
> YL