I was testing with Flink 1.9. Here is how I set up mini cluster int port = 6124; int parallelism = 2; Configuration config = new Configuration(); config.setInteger(JobManagerOptions.PORT, port); config.setString(JobManagerOptions.ADDRESS, "localhost"); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism); // In a non MiniCluster setup queryable state is enabled by default. config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069"); config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2); config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067"); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2); MiniClusterConfiguration clusterconfig = new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, null); try { // Create a local Flink server MiniCluster flinkCluster = new MiniCluster(clusterconfig); // Start server and create environment flinkCluster.start(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", port); env.setParallelism(parallelism); // Build Graph buildGraph(env); JobGraph jobGraph = env.getStreamGraph().getJobGraph(); // Submit to the server and wait for completion JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get(); System.out.println("Job ID : " + result.getJobID()); Thread.sleep(Long.MAX_VALUE); } catch (Throwable t){ t.printStackTrace(); } And have a client, that looks like follows: def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int = 9069, timeInterval: Long = defaulttimeInterval): Unit = { // JobID, has to correspond to a running job val jobId = JobID.fromHexString(job) // Client val client = new QueryableStateClient(host, port) But when I tried it, it gives an exception that nothing is listening on port 9069 It works with the old FlinkLocalMiniCluster, but not with the MiniCluster Am I missing something? |
Hi, I think you should check TM log first and check if there are some info like: 1430 [main] INFO org.apache.flink.queryablestate.server.KvStateServerImpl - Started Queryable State Server @ /127.0.0.1:9069. 1436 [main] INFO org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl - Started Queryable State Proxy Server @ /127.0.0. Best, Guowei Boris Lublinsky <[hidden email]> 于2019年4月15日周一 上午4:02写道:
|
Thanks Guowei
The questions that I am asking is slightly different: 1. Does Mini cluster support queryable state? 2. If the answer is yes, how to set it up?
|
Hi, 1. I think Mini cluster supports queryable state. 2. You could set queryable-state.enable to true and try again. You could check AbstractQueryableStateTestBase and there are some tests. :) Best, Guowei Boris Lublinsky <[hidden email]> 于2019年4月16日周二 下午9:09写道:
|
Thanks thats it.
|
Free forum by Nabble | Edit this page |