PyFlink Connection Refused to Kubernetes Session Cluster

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink Connection Refused to Kubernetes Session Cluster

Robert Cullen

Attempting to run the word_count.py example on my kubernetes (session) cluster:

./bin/flink run \                             
--target kubernetes-session \
-Dkubernetes.cluster-id=cmdaa \
-Dkubernetes.container.image=cmdaa/pyflink:0.0.1 \
--pyModule word_count \
--pyFiles /opt/flink-1.12.2/examples/python/table/batch/word_count.py

The following exception occurs:

2021-03-04 12:51:41,465 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster cmdaa successfully, JobManager Web Interface: http://spackler:8081
Traceback (most recent call last):
  File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py", line 80, in <module>
    word_count()
  File "/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py", line 74, in word_count
    t_env.execute("word_count")
  File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1276, in execute
  File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:352)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:317)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:349)
    ... 12 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... 1 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
    ... 21 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: spackler/192.168.88.200:8081
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
    ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: spackler/192.168.88.200:8081
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.ProgramAbortException
    at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Do I need to create an ingress for pyflink?

Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Connection Refused to Kubernetes Session Cluster

Shuiqiang Chen
Hi Robert,

It seems the retrieved address of JobManager is a cluster-internal Ip that can noly be accessed inside the cluster. As you said, you might need to create an ingress to expose the JobManager service so that the client can access to it outside of the k8s cluster.

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月5日周五 上午2:58写道:

Attempting to run the word_count.py example on my kubernetes (session) cluster:

./bin/flink run \                             
--target kubernetes-session \
-Dkubernetes.cluster-id=cmdaa \
-Dkubernetes.container.image=cmdaa/pyflink:0.0.1 \
--pyModule word_count \
--pyFiles /opt/flink-1.12.2/examples/python/table/batch/word_count.py

The following exception occurs:

2021-03-04 12:51:41,465 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster cmdaa successfully, JobManager Web Interface: http://spackler:8081
Traceback (most recent call last):
  File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py", line 80, in <module>
    word_count()
  File "/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py", line 74, in word_count
    t_env.execute("word_count")
  File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1276, in execute
  File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:352)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:317)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:349)
    ... 12 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... 1 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
    ... 21 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: spackler/192.168.88.200:8081
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
    ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: spackler/192.168.88.200:8081
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.ProgramAbortException
    at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Do I need to create an ingress for pyflink?

Robert Cullen
240-475-4490