Cannot connect to queryable state proxy

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot connect to queryable state proxy

ChangZhuo Chen (陳昌倬)
Hi,

We have problem connecting to queryable state client proxy as described
in [0]. Any help is appreciated.


The following is our setup:

* Flink 1.12.1
* Standalone Kubernetes
* Related config in flink-conf.yaml

  ```
  queryable-state.enable: true
  queryable-state.proxy.ports: 6125
  ```

* taskmanager log

  ```
  2021-02-04 03:22:57,650 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Successful initialization (took 35 ms). Listening on SocketAddress /0.0.0.0:43665.
  2021-02-04 03:22:57,656 INFO  org.apache.flink.runtime.taskexecutor.KvStateService         [] - Starting the kvState service and its components.
  2021-02-04 03:22:57,672 INFO  org.apache.flink.queryablestate.server.KvStateServerImpl     [] - Started Queryable State Server @ /10.200.18.4:9067.
  2021-02-04 03:22:57,679 INFO  org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl [] - Started Queryable State Proxy Server @ /10.200.18.4:6125.
  2021-02-04 03:22:57,698 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/rpc/taskmanager_0 .
  ```

* The port 6125 is opened in taskmanager pod.

  ```
  root@<censored>-654b94754d-2vknh:/tmp# ss -tlp
  State            Recv-Q           Send-Q                     Local Address:Port                      Peer Address:Port          Process
  LISTEN           0                1024                             0.0.0.0:46561                          0.0.0.0:*
  LISTEN           0                3                                0.0.0.0:9249                           0.0.0.0:*
  LISTEN           0                1024                             0.0.0.0:6122                           0.0.0.0:*
  LISTEN           0                1024                         10.200.11.3:9067                           0.0.0.0:*
  LISTEN           0                1024                         10.200.11.3:6125                           0.0.0.0:*
  LISTEN           0                1024                             0.0.0.0:38607                          0.0.0.0:*
  ```

* However, we always get the following error when using queryable API:

  ```
  Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:6125
          at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
          at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
          at com.appier.rt.short_term_score.QueryCalculateUserST$.printMapState(QueryCalculateUserST.scala:44)
          at com.appier.rt.short_term_score.QueryCalculateUserST$.main(QueryCalculateUserST.scala:82)
          at com.appier.rt.short_term_score.QueryCalculateUserST.main(QueryCalculateUserST.scala)
  Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:6125
  Caused by: java.net.ConnectException: Connection refused
          at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
          at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
          at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
          at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
          at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
          at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
          at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
          at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
          at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
          at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
          at java.base/java.lang.Thread.run(Thread.java:834)
  ```

* `nc` in taskmanager itself also get the same error:

  ```
  root@<censored>:/tmp# nc -vz localhost 6125
  nc: connect to localhost port 6125 (tcp) failed: Connection refused
  nc: connect to localhost port 6125 (tcp) failed: Cannot assign requested address
  ```


[0] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html

--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
gttp://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Cannot connect to queryable state proxy

ChangZhuo Chen (陳昌倬)
On Thu, Feb 04, 2021 at 04:26:42PM +0800, ChangZhuo Chen (陳昌倬) wrote:

> Hi,
>
> We have problem connecting to queryable state client proxy as described
> in [0]. Any help is appreciated.
>
> * The port 6125 is opened in taskmanager pod.
>
>   ```
>   root@<censored>-654b94754d-2vknh:/tmp# ss -tlp
>   State            Recv-Q           Send-Q                     Local Address:Port                      Peer Address:Port          Process
>   LISTEN           0                1024                             0.0.0.0:46561                          0.0.0.0:*
>   LISTEN           0                3                                0.0.0.0:9249                           0.0.0.0:*
>   LISTEN           0                1024                             0.0.0.0:6122                           0.0.0.0:*
>   LISTEN           0                1024                         10.200.11.3:9067                           0.0.0.0:*
>   LISTEN           0                1024                         10.200.11.3:6125                           0.0.0.0:*
>   LISTEN           0                1024                             0.0.0.0:38607                          0.0.0.0:*
>   ```
The problem is that Flink only listens 10.200.11.3:6125 for queryable
state client proxy, so we need to use correct network to connect to it.
Is there any way we can make Flink to listen to 0.0.0.0 for queryable
state client proxy?


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Cannot connect to queryable state proxy

r_khachatryan
Hi ChangZhuo,

Queryable state is exposed on the same address as the TM RPC. You can change this address by modifying taskmanager.host [1].
However, I'm not sure if setting it to 127.0.0.1 or localhost will not break connectivity with the other components.


Regards,
Roman


On Sun, Feb 7, 2021 at 2:20 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
On Thu, Feb 04, 2021 at 04:26:42PM +0800, ChangZhuo Chen (陳昌倬) wrote:
> Hi,
>
> We have problem connecting to queryable state client proxy as described
> in [0]. Any help is appreciated.
>
> * The port 6125 is opened in taskmanager pod.
>
>   ```
>   root@<censored>-654b94754d-2vknh:/tmp# ss -tlp
>   State            Recv-Q           Send-Q                     Local Address:Port                      Peer Address:Port          Process
>   LISTEN           0                1024                             0.0.0.0:46561                          0.0.0.0:*
>   LISTEN           0                3                                0.0.0.0:9249                           0.0.0.0:*
>   LISTEN           0                1024                             0.0.0.0:6122                           0.0.0.0:*
>   LISTEN           0                1024                         10.200.11.3:9067                           0.0.0.0:*
>   LISTEN           0                1024                         10.200.11.3:6125                           0.0.0.0:*
>   LISTEN           0                1024                             0.0.0.0:38607                          0.0.0.0:*
>   ```

The problem is that Flink only listens 10.200.11.3:6125 for queryable
state client proxy, so we need to use correct network to connect to it.
Is there any way we can make Flink to listen to 0.0.0.0 for queryable
state client proxy?


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B