Queryable state support in Flink 1.9

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable state support in Flink 1.9

Boris Lublinsky
I was testing with Flink 1.9. Here is how I set up mini cluster

int port = 6124;
int parallelism = 2;
Configuration config = new Configuration();
config.setInteger(JobManagerOptions.PORT, port);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
// In a non MiniCluster setup queryable state is enabled by default.
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069");
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);

config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067");
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);

MiniClusterConfiguration clusterconfig =
new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, null);
try {
// Create a local Flink server
MiniCluster flinkCluster = new MiniCluster(clusterconfig);
// Start server and create environment
flinkCluster.start();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
env.setParallelism(parallelism);
// Build Graph
buildGraph(env);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
// Submit to the server and wait for completion
JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get();
System.out.println("Job ID : " + result.getJobID());
Thread.sleep(Long.MAX_VALUE);
} catch (Throwable t){
t.printStackTrace();
}

And have a client, that looks like follows:

def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int = 9069,
            timeInterval: Long = defaulttimeInterval): Unit = {

    // JobID, has to correspond to a running job
    val jobId = JobID.fromHexString(job)
    // Client
    val client = new QueryableStateClient(host, port)

But when I tried it, it gives an exception that nothing is listening on port 9069

It works with the old FlinkLocalMiniCluster, but not with the MiniCluster

Am I missing something?



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

Reply | Threaded
Open this post in threaded view
|

Re: Queryable state support in Flink 1.9

Guowei Ma
Hi,
I think you should check TM log first and check if there are some info like:
1430 [main] INFO  org.apache.flink.queryablestate.server.KvStateServerImpl  - Started Queryable State Server @ /127.0.0.1:9069.
1436 [main] INFO  org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  - Started Queryable State Proxy Server @ /127.0.0.


Best,
Guowei


Boris Lublinsky <[hidden email]> 于2019年4月15日周一 上午4:02写道:
I was testing with Flink 1.9. Here is how I set up mini cluster

int port = 6124;
int parallelism = 2;
Configuration config = new Configuration();
config.setInteger(JobManagerOptions.PORT, port);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
// In a non MiniCluster setup queryable state is enabled by default.
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069");
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);

config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067");
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);

MiniClusterConfiguration clusterconfig =
new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, null);
try {
// Create a local Flink server
MiniCluster flinkCluster = new MiniCluster(clusterconfig);
// Start server and create environment
flinkCluster.start();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
env.setParallelism(parallelism);
// Build Graph
buildGraph(env);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
// Submit to the server and wait for completion
JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get();
System.out.println("Job ID : " + result.getJobID());
Thread.sleep(Long.MAX_VALUE);
} catch (Throwable t){
t.printStackTrace();
}

And have a client, that looks like follows:

def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int = 9069,
            timeInterval: Long = defaulttimeInterval): Unit = {

    // JobID, has to correspond to a running job
    val jobId = JobID.fromHexString(job)
    // Client
    val client = new QueryableStateClient(host, port)

But when I tried it, it gives an exception that nothing is listening on port 9069

It works with the old FlinkLocalMiniCluster, but not with the MiniCluster

Am I missing something?



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

Reply | Threaded
Open this post in threaded view
|

Re: Queryable state support in Flink 1.9

Boris Lublinsky
Thanks Guowei
The questions that I am asking is slightly different:
1. Does Mini cluster support queryable state?
2. If the answer is yes, how to set it up? 

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Apr 15, 2019, at 12:07 AM, Guowei Ma <[hidden email]> wrote:

Hi,
I think you should check TM log first and check if there are some info like:
1430 [main] INFO  org.apache.flink.queryablestate.server.KvStateServerImpl  - Started Queryable State Server @ /127.0.0.1:9069.
1436 [main] INFO  org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  - Started Queryable State Proxy Server @ /127.0.0.


Best,
Guowei


Boris Lublinsky <[hidden email]> 于2019年4月15日周一 上午4:02写道:
I was testing with Flink 1.9. Here is how I set up mini cluster

int port = 6124;
int parallelism = 2;
Configuration config = new Configuration();
config.setInteger(JobManagerOptions.PORT, port);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
// In a non MiniCluster setup queryable state is enabled by default.
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069");
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);

config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067");
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);

MiniClusterConfiguration clusterconfig =
new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, null);
try {
// Create a local Flink server
MiniCluster flinkCluster = new MiniCluster(clusterconfig);
// Start server and create environment
flinkCluster.start();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
env.setParallelism(parallelism);
// Build Graph
buildGraph(env);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
// Submit to the server and wait for completion
JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get();
System.out.println("Job ID : " + result.getJobID());
Thread.sleep(Long.MAX_VALUE);
} catch (Throwable t){
t.printStackTrace();
}

And have a client, that looks like follows:

def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int = 9069,
            timeInterval: Long = defaulttimeInterval): Unit = {

    // JobID, has to correspond to a running job
    val jobId = JobID.fromHexString(job)
    // Client
    val client = new QueryableStateClient(host, port)

But when I tried it, it gives an exception that nothing is listening on port 9069

It works with the old FlinkLocalMiniCluster, but not with the MiniCluster

Am I missing something?



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/


Reply | Threaded
Open this post in threaded view
|

Re: Queryable state support in Flink 1.9

Guowei Ma
Hi,

1. I think Mini cluster supports queryable state.
2. You could set queryable-state.enable to true and try again.
You could check AbstractQueryableStateTestBase and there are some tests.
:)

Best,
Guowei


Boris Lublinsky <[hidden email]> 于2019年4月16日周二 下午9:09写道:
Thanks Guowei
The questions that I am asking is slightly different:
1. Does Mini cluster support queryable state?
2. If the answer is yes, how to set it up? 

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Apr 15, 2019, at 12:07 AM, Guowei Ma <[hidden email]> wrote:

Hi,
I think you should check TM log first and check if there are some info like:
1430 [main] INFO  org.apache.flink.queryablestate.server.KvStateServerImpl  - Started Queryable State Server @ /127.0.0.1:9069.
1436 [main] INFO  org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  - Started Queryable State Proxy Server @ /127.0.0.


Best,
Guowei


Boris Lublinsky <[hidden email]> 于2019年4月15日周一 上午4:02写道:
I was testing with Flink 1.9. Here is how I set up mini cluster

int port = 6124;
int parallelism = 2;
Configuration config = new Configuration();
config.setInteger(JobManagerOptions.PORT, port);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
// In a non MiniCluster setup queryable state is enabled by default.
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069");
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);

config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067");
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);

MiniClusterConfiguration clusterconfig =
new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, null);
try {
// Create a local Flink server
MiniCluster flinkCluster = new MiniCluster(clusterconfig);
// Start server and create environment
flinkCluster.start();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
env.setParallelism(parallelism);
// Build Graph
buildGraph(env);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
// Submit to the server and wait for completion
JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get();
System.out.println("Job ID : " + result.getJobID());
Thread.sleep(Long.MAX_VALUE);
} catch (Throwable t){
t.printStackTrace();
}

And have a client, that looks like follows:

def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int = 9069,
            timeInterval: Long = defaulttimeInterval): Unit = {

    // JobID, has to correspond to a running job
    val jobId = JobID.fromHexString(job)
    // Client
    val client = new QueryableStateClient(host, port)

But when I tried it, it gives an exception that nothing is listening on port 9069

It works with the old FlinkLocalMiniCluster, but not with the MiniCluster

Am I missing something?



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/


Reply | Threaded
Open this post in threaded view
|

Re: Queryable state support in Flink 1.9

Boris Lublinsky
Thanks thats it.

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Apr 16, 2019, at 8:31 AM, Guowei Ma <[hidden email]> wrote:

AbstractQueryableStateTestBase