ConnectTimeoutException when createPartitionRequestClient

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

ConnectTimeoutException when createPartitionRequestClient

Wenrui Meng
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Till Rohrmann
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Wenrui Meng
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Till Rohrmann
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it locally and checked that the timeout is correctly registered in Netty's AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could indicate that there is some long lasting or blocking operation being executed by the threads.

How does the job submission and cluster configuration work with AthenaX? Will the platform spawn for each job a new Flink cluster for which you can specify the cluster configuration?


Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Wenrui Meng
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
        at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/10.70.129.13:39466
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
        ... 6 common frames omitted


Thanks,
Wenrui

On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it locally and checked that the timeout is correctly registered in Netty's AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could indicate that there is some long lasting or blocking operation being executed by the threads.

How does the job submission and cluster configuration work with AthenaX? Will the platform spawn for each job a new Flink cluster for which you can specify the cluster configuration?


Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Till Rohrmann
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure whether this is so different. Could it be that your network is overloaded or not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till

On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
        at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/10.70.129.13:39466
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
        ... 6 common frames omitted


Thanks,
Wenrui

On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it locally and checked that the timeout is correctly registered in Netty's AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could indicate that there is some long lasting or blocking operation being executed by the threads.

How does the job submission and cluster configuration work with AthenaX? Will the platform spawn for each job a new Flink cluster for which you can specify the cluster configuration?


Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Wenrui Meng
Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping the connected host from connecting host. It seems very stable. For the connection timeout, I do set it as 20min but it still report the timeout after 2 minutes. Could you let me know how do you test locally about the timeout setting?

Thanks,
Wenrui

On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure whether this is so different. Could it be that your network is overloaded or not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till

On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
        at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/10.70.129.13:39466
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
        ... 6 common frames omitted


Thanks,
Wenrui

On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it locally and checked that the timeout is correctly registered in Netty's AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could indicate that there is some long lasting or blocking operation being executed by the threads.

How does the job submission and cluster configuration work with AthenaX? Will the platform spawn for each job a new Flink cluster for which you can specify the cluster configuration?


Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Till Rohrmann
Hi Wenrui,

I executed AutoParallelismITCase#testProgramWithAutoParallelism and set a breakpoint in NettClient.java:102 to see whether the configured timeout value is correctly set. Moreover, I did the same for AbstractNioChannel.java:207 and it looked as if the correct timeout value was set.

What is the special uber Flink version? What patches does it include? Are you able to run your tests with the latest vanilla Flink version?

Cheers,
Till

On Wed, Jan 9, 2019 at 10:40 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping the connected host from connecting host. It seems very stable. For the connection timeout, I do set it as 20min but it still report the timeout after 2 minutes. Could you let me know how do you test locally about the timeout setting?

Thanks,
Wenrui

On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure whether this is so different. Could it be that your network is overloaded or not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till

On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
        at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/10.70.129.13:39466
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
        ... 6 common frames omitted


Thanks,
Wenrui

On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it locally and checked that the timeout is correctly registered in Netty's AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could indicate that there is some long lasting or blocking operation being executed by the threads.

How does the job submission and cluster configuration work with AthenaX? Will the platform spawn for each job a new Flink cluster for which you can specify the cluster configuration?


Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Zhijiang(wangzhijiang999)
In reply to this post by Wenrui Meng
Hi Wenrui,

I suspect another issue which might cause connection failure. You can check whether the netty server already binds and listens port successfully in time before the client requests connection. If there exists some time-consuming process during TM startup which might delay netty server start, so when the client requests connection, the server is not ready which may cause connection timeout or failure.

From your description, it seems exist in only some TM. Because when you decrease the total parallel, it might miss the problem TM and does not cause this issue. The default number of netty thread and timeout should make sense for normal cases.

Best,
Zhijiang

------------------------------------------------------------------
From:Wenrui Meng <[hidden email]>
Send Time:2019年1月9日(星期三) 18:18
To:Till Rohrmann <[hidden email]>
Cc:user <[hidden email]>; Konstantin <[hidden email]>
Subject:Re: ConnectTimeoutException when createPartitionRequestClient

Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping the connected host from connecting host. It seems very stable. For the connection timeout, I do set it as 20min but it still report the timeout after 2 minutes. Could you let me know how do you test locally about the timeout setting?

Thanks,
Wenrui

On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure whether this is so different. Could it be that your network is overloaded or not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till

On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
        at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/10.70.129.13:39466
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
        ... 6 common frames omitted


Thanks,
Wenrui

On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it locally and checked that the timeout is correctly registered in Netty's AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could indicate that there is some long lasting or blocking operation being executed by the threads.

How does the job submission and cluster configuration work with AthenaX? Will the platform spawn for each job a new Flink cluster for which you can specify the cluster configuration?


Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui

Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Wenrui Meng
In reply to this post by Till Rohrmann
Hi Till,

I will try the local test according to your suggestion. Uber Flink version is mainly adding something to integrate with Uber deployment and other infra components. There is no change for Flink original code flow. 

I also found that the issue can be avoided with the same setting in other clusters. I guess there is a network or other hardware issue. But it's still painful to me that taskmanager.network.netty.client.connectTimeoutSec might not be the only config affect the netty connection timeout, since I increase it to 1200sec timeout still happened after 2minutes. 

Thanks,
Wenrui

On Wed, Jan 9, 2019 at 2:06 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

I executed AutoParallelismITCase#testProgramWithAutoParallelism and set a breakpoint in NettClient.java:102 to see whether the configured timeout value is correctly set. Moreover, I did the same for AbstractNioChannel.java:207 and it looked as if the correct timeout value was set.

What is the special uber Flink version? What patches does it include? Are you able to run your tests with the latest vanilla Flink version?

Cheers,
Till

On Wed, Jan 9, 2019 at 10:40 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping the connected host from connecting host. It seems very stable. For the connection timeout, I do set it as 20min but it still report the timeout after 2 minutes. Could you let me know how do you test locally about the timeout setting?

Thanks,
Wenrui

On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure whether this is so different. Could it be that your network is overloaded or not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till

On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
        at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/10.70.129.13:39466
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
        ... 6 common frames omitted


Thanks,
Wenrui

On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it locally and checked that the timeout is correctly registered in Netty's AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could indicate that there is some long lasting or blocking operation being executed by the threads.

How does the job submission and cluster configuration work with AthenaX? Will the platform spawn for each job a new Flink cluster for which you can specify the cluster configuration?


Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui
Reply | Threaded
Open this post in threaded view
|

Re: ConnectTimeoutException when createPartitionRequestClient

Wenrui Meng
In reply to this post by Zhijiang(wangzhijiang999)
Hi Zhijiang,

Thanks for your reply. I first also suspect the same reason. But once I read the connected host log, the netty server starts to listen on the correct port after 2 seconds of task manager start. I compared the log of the connected host and connecting host log, it seems requesting partition happened after the connected host netty server starts. 

But I think there is some other hardware or config issue as I replied to Till. I will work with our infra team to see whether there is some obvious issue on that cluster. Meanwhile, if there is anyone knows how to config the netty nio channel timeout.

Thanks,
Wenrui

On Wed, Jan 9, 2019 at 7:49 PM zhijiang <[hidden email]> wrote:
Hi Wenrui,

I suspect another issue which might cause connection failure. You can check whether the netty server already binds and listens port successfully in time before the client requests connection. If there exists some time-consuming process during TM startup which might delay netty server start, so when the client requests connection, the server is not ready which may cause connection timeout or failure.

From your description, it seems exist in only some TM. Because when you decrease the total parallel, it might miss the problem TM and does not cause this issue. The default number of netty thread and timeout should make sense for normal cases.

Best,
Zhijiang

------------------------------------------------------------------
From:Wenrui Meng <[hidden email]>
Send Time:2019年1月9日(星期三) 18:18
To:Till Rohrmann <[hidden email]>
Cc:user <[hidden email]>; Konstantin <[hidden email]>
Subject:Re: ConnectTimeoutException when createPartitionRequestClient

Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping the connected host from connecting host. It seems very stable. For the connection timeout, I do set it as 20min but it still report the timeout after 2 minutes. Could you let me know how do you test locally about the timeout setting?

Thanks,
Wenrui

On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure whether this is so different. Could it be that your network is overloaded or not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till

On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
        at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
        at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/10.70.129.13:39466
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
        ... 6 common frames omitted


Thanks,
Wenrui

On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it locally and checked that the timeout is correctly registered in Netty's AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could indicate that there is some long lasting or blocking operation being executed by the threads.

How does the job submission and cluster configuration work with AthenaX? Will the platform spawn for each job a new Flink cluster for which you can specify the cluster configuration?


Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <[hidden email]> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it can succeed sometimes. But keep increasing it doesn't solve the problem. We have 100 parallel intermediate results, so there are too many partition requests. I think that's why it timeout. The solution should let the connection timeout increase. But I think there is some issue that connection doesn't respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team who is responsible to provide a platform with Flink. They will upgrade it at the end of this Month. Meanwhile, I would like to ask some help to investigate how to increase the connection timeout and make it respected. 

Thanks,
Wenrui

On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <[hidden email]> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <[hidden email]> wrote:
Hi,

I consistently get connection timeout issue when creating partitionRequestClient in flink 1.4. I tried to ping from the connecting host to the connected host, but the ping latency is less than 0.1 ms consistently. So it's probably not due to the cluster status. I also tried increase max backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful information to help me fix the issue. Here is the link of the jobManager and taskManager logs. The connecting host is the host which throw the exception. The connected host is the host the connecting host try to request partition from. 

Since our platform is not up to date yet, the flink version I used in this is 1.4. But I noticed that there is not much change of these code on the Master branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: athena485-sjc1/10.70.132.8:34185
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui