I just tried using the Flink SQL Client. A simple job is not running because it cannot hit jobmanager. I'm not sure why Flink SQL Client is hitting "flink-jobmanager/10.98.253.58:8081". I'd expect either "flink-jobmanager:8081" or "10.98.253.58:8081" (which should work with my kubernetes setup). I'm using riskfocus's Flink helm chart. The last SELECT errors out. I have an environment file that indicates this "execution.type: batch". My setup works when using DataSet and DataStream. The jobmanager and taskmanager logs look fine. This seems like a weird configuration with SQL Client that is either broken with that Flink helm chart or with SQL Client. Flink SQL> DROP TABLE `default_catalog.mydb.user`; [INFO] Table has been removed. Flink SQL> CREATE TABLE `default_catalog.mydb.user` (`platformId` BIGINT, `userId` STRING) WITH ('connector' = 'filesystem', 'path' = 's3://mys3bucket/users.csv','format' = 'csv'); [INFO] Table has been created. Flink SQL> SELECT * FROM `default_catalog.mydb.user` LIMIT 10; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: flink-jobmanager/10.98.253.58:8081 My config logs from jobmanager. 2020-09-11 02:33:07,962 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using predefined options: DEFAULT. 2020-09-11 02:33:07,962 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}. 2020-09-11 02:33:08,100 INFO org.apache.flink.table.client.gateway.local.ProgramDeployer [] - Submitting job org.apache.flink.streaming.api.graph.StreamGraph@3bd08435 for query default: SELECT * FROM `default_catalog.mydb.user` LIMIT 10` 2020-09-11 02:33:08,112 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' tail: log/flink--sql-client-flink-jobmanager-0.log: file truncated 2020-09-11 02:34:35,848 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2 2020-09-11 02:34:35,884 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: blob.server.port, 6124 2020-09-11 02:34:35,884 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.rpc.port, 6122 2020-09-11 02:34:35,885 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.heap.size, 1g 2020-09-11 02:34:35,886 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1g 2020-09-11 02:34:35,886 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend, rocksdb 2020-09-11 02:34:35,886 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints 2020-09-11 02:34:35,887 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints 2020-09-11 02:34:35,887 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.async, true 2020-09-11 02:34:35,887 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.fs.memory-threshold, 1024 2020-09-11 02:34:35,888 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096 2020-09-11 02:34:35,888 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.incremental, true 2020-09-11 02:34:35,888 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.local-recovery, true 2020-09-11 02:34:35,889 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.checkpoints.num-retained, 1 2020-09-11 02:34:35,889 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery 2020-09-11 02:34:35,889 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1 2020-09-11 02:34:35,890 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb 2020-09-11 02:34:35,890 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory 2020-09-11 02:34:35,890 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT 2020-09-11 02:34:35,891 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP 2020-09-11 02:34:35,891 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false 2020-09-11 02:34:35,892 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager 2020-09-11 02:34:35,892 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2020-09-11 02:34:35,892 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb 2020-09-11 02:34:35,893 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.endpoint, http://minio:9000 2020-09-11 02:34:35,893 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.path.style.access, true 2020-09-11 02:34:35,894 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.access-key, YOURACCESSKEY 2020-09-11 02:34:35,894 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.secret-key, ****** 2020-09-11 02:34:35,895 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 2020-09-11 02:34:36,492 INFO org.apache.flink.client.cli.CliFrontend [] - Loading FallbackYarnSessionCli 2020-09-11 02:34:36,498 INFO org.apache.flink.table.client.gateway.local.LocalExecutor [] - Using default environment file: file:/opt/flink/conf/sql-client-defaults.yaml 2020-09-11 02:34:37,786 INFO org.apache.flink.table.client.SqlClient [] - Using session environment file: file:/opt/flink/sql-client-defaults.yaml 2020-09-11 02:34:37,931 INFO org.apache.flink.table.client.config.entries.ExecutionEntry [] - Property 'execution.restart-strategy.type' not specified. Using default value: fallback 2020-09-11 02:34:43,269 INFO org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor config: {blob.server.port=6124, s3a.secret-key=YOURSECRETKEY, state.checkpoints.num-retained=1, s3a.access-key=YOURACCESSKEY, state.backend.rocksdb.options-factory=org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, jobmanager.rpc.address=flink-jobmanager, taskmanager.memory.jvm-metaspace.size=256mb, state.backend.rocksdb.predefined-options=DEFAULT, state.savepoints.dir=file:///flink_state/savepoints, state.backend.fs.memory-threshold=1024, state.backend.rocksdb.timer-service.factory=HEAP, execution.savepoint.ignore-unclaimed-state=false, taskmanager.numberOfTaskSlots=2, state.backend.async=true, pipeline.classpaths=[], state.backend.rocksdb.ttl.compaction.filter.enabled=false, jobmanager.heap.size=1g, taskmanager.memory.process.size=1g, s3a.endpoint=http://minio:9000, state.backend.rocksdb.checkpoint.transfer.thread.num=1, state.backend.local-recovery=true, state.backend.rocksdb.localdir=/flink_state/rocksdb, state.backend.incremental=true, execution.target=remote, taskmanager.rpc.port=6122, jobmanager.rpc.port=6123, s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider, state.backend.fs.write-buffer-size=4096, execution.attached=true, execution.shutdown-on-attached-exit=false, pipeline.jars=[file:/opt/flink/opt/flink-sql-client_2.12-1.11.1.jar], state.backend=rocksdb, s3a.path.style.access=true, taskmanager.state.local.root-dirs=file:///flink_state/local-recovery, state.checkpoints.dir=file:///flink_state/checkpoints} 2020-09-11 02:34:43,286 INFO org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies. 2020-09-11 02:34:44,079 INFO org.apache.flink.table.client.cli.CliClient [] - Command history file path: /root/.flink-sql-history 2020-09-11 02:35:15,396 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using predefined options: DEFAULT. 2020-09-11 02:35:15,397 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}. 2020-09-11 02:35:17,529 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2 2020-09-11 02:35:17,530 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: blob.server.port, 6124 2020-09-11 02:35:17,530 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.rpc.port, 6122 2020-09-11 02:35:17,530 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.heap.size, 1g 2020-09-11 02:35:17,530 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1g 2020-09-11 02:35:17,530 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend, rocksdb 2020-09-11 02:35:17,530 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints 2020-09-11 02:35:17,531 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints 2020-09-11 02:35:17,531 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.async, true 2020-09-11 02:35:17,531 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.fs.memory-threshold, 1024 2020-09-11 02:35:17,531 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096 2020-09-11 02:35:17,531 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.incremental, true 2020-09-11 02:35:17,531 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.local-recovery, true 2020-09-11 02:35:17,532 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.checkpoints.num-retained, 1 2020-09-11 02:35:17,532 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery 2020-09-11 02:35:17,532 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1 2020-09-11 02:35:17,532 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb 2020-09-11 02:35:17,532 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory 2020-09-11 02:35:17,532 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT 2020-09-11 02:35:17,532 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP 2020-09-11 02:35:17,533 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false 2020-09-11 02:35:17,533 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager 2020-09-11 02:35:17,533 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2020-09-11 02:35:17,533 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb 2020-09-11 02:35:17,533 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.endpoint, http://minio:9000 2020-09-11 02:35:17,534 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.path.style.access, true 2020-09-11 02:35:17,534 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.access-key, YOURACCESSKEY 2020-09-11 02:35:17,534 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.secret-key, ****** 2020-09-11 02:35:17,535 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 2020-09-11 02:35:18,047 INFO org.apache.flink.table.client.gateway.local.ProgramDeployer [] - Submitting job org.apache.flink.streaming.api.graph.StreamGraph@2f95653f for query default: SELECT * FROM `default_catalog.mydb.user` LIMIT 10` 2020-09-11 02:35:18,382 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' 2020-09-11 02:41:39,539 WARN org.apache.flink.runtime.rest.RestClient [] - Rest endpoint shutdown failed. java.util.concurrent.TimeoutException: null |
Hi Dan, the notation of "flink-jobmanager/10.98.253.58:8081" is not a problem. It is how java.net.InetAddress stringifies a resolved address (with both hostname and IP). How did you configure the SQL client to work with a Kubernetes Session? Afaik this is not a documented, tested and officially supported feature (this doesn't mean we should not support it -- apparently it is something we should do rather soon ;) ). Best, Robert On Fri, Sep 11, 2020 at 5:25 AM Dan Hill <[hidden email]> wrote:
|
Hi Robert! I have Flink running locally on minikube. I'm running SQL client using exec on the jobmanager. kubectl exec pod/flink-jobmanager-0 -i -t -- /opt/flink/bin/sql-client.sh embedded -e /opt/flink/sql-client-defaults.yaml Here's the sql-client-defaults.yaml. I didn't specify a session. execution: type: batch result-mode: table max-table-result-rows: 1000000 I'm prototyping the Table SQL interface. I got blocked using the Table SQL interface and figured I'd try the SQL Client to see if I could get unblocked. On Fri, Sep 11, 2020 at 11:18 AM Robert Metzger <[hidden email]> wrote:
|
Hi Dan, Can you verify from the pod that jobmanager and 10.98.253.58:8081 is actually accessible (e.g., with curl)? I'd probably also try out localhost:8081 as you are connecting to the respective pod directly. On Fri, Sep 11, 2020 at 9:59 PM Dan Hill <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Dan, I don't think the SQL Client officially supports running against Kubernetes. What you could try is using an undocumented, untested feature: Put something like jobmanager: kubernetes into the "deployment:" section of the Sql Client configuration. Proper support for Kubernetes, YARN etc. is (probably stalled) work in progress. On Mon, Sep 14, 2020 at 2:18 PM Arvid Heise <[hidden email]> wrote:
|
Hi Robert! Sorry for the delay. This worked! Thanks! I used slightly different deployment parameters. deployment: gateway-address: flink-jobmanager gateway-port: 8081 On Mon, Sep 14, 2020 at 6:21 AM Robert Metzger <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |