Queryable State Client with 1.3.0-rc0

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable State Client with 1.3.0-rc0

Fahey, Claudio

I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter?

 

I have tried the following code:

 

  val jobManagerIpcAddress = “localhost”

  val jobManagerIpcPort = 6123

  configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)

  configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)

  private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)

  private val client = new QueryableStateClient(configuration, highAvailabilityServices)

 

It results in:

 

Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/localhost)]

 

 

Claudio Fahey
Chief Solutions Architect, Analytics
Dell EMC | Emerging Technologies Team

 

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State Client with 1.3.0-rc0

Aljoscha Krettek
Hi Claudio,

The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state. Could you see if that helps? The important bit for you is probably this:

final HighAvailabilityServices highAvailabilityServices =
      HighAvailabilityServicesUtils.createHighAvailabilityServices(
           config,
           Executors.newSingleThreadScheduledExecutor(),
           HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

If that doesn’t help we’ll need to delve deeper.

Best,
Aljoscha

On 11. May 2017, at 22:21, Fahey, Claudio <[hidden email]> wrote:

I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter?
 
I have tried the following code:
 
  val jobManagerIpcAddress = “localhost”
  val jobManagerIpcPort = 6123
  configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
  configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
  private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
  private val client = new QueryableStateClient(configuration, highAvailabilityServices)
 
It results in:
 
Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(<a href="akka://flink/" class="">akka://flink/), Path(/localhost)]
 
 
Claudio Fahey
Chief Solutions Architect, Analytics
Dell EMC | Emerging Technologies Team


Reply | Threaded
Open this post in threaded view
|

Re: Queryable State Client with 1.3.0-rc0

Aljoscha Krettek
Hi Claudio,

Quick follow up: querying a locally started cluster does not work out-of-box anymore in Flink 1.3. You can manually start a mini cluster that has the required settings, though. You would do something like this:

Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
configuration.setString(JobManagerOptions.ADDRESS, "localhost");
configuration.setInteger(JobManagerOptions.PORT, 6123);
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

flinkMiniCluster = new LocalFlinkMiniCluster(
    configuration,
    HighAvailabilityServicesUtils.createHighAvailabilityServices(
        configuration,
        Executors.directExecutor(),
        HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
    false);

flinkMiniCluster.start();

And then you can create a remote StreamExecutionEnvironment using StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to that cluster.

You can stop the cluster using flinkMiniCluster.stop()

I hope this helps?

Best,
Aljoscha

> On 6. Jun 2017, at 16:33, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Claudio,
>
> The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state. Could you see if that helps? The important bit for you is probably this:
>
> final HighAvailabilityServices highAvailabilityServices =
>       HighAvailabilityServicesUtils.createHighAvailabilityServices(
>            config,
>            Executors.newSingleThreadScheduledExecutor(),
>            HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>
> If that doesn’t help we’ll need to delve deeper.
>
> Best,
> Aljoscha
>
>> On 11. May 2017, at 22:21, Fahey, Claudio <[hidden email]> wrote:
>>
>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter?
>>  
>> I have tried the following code:
>>  
>>   val jobManagerIpcAddress = “localhost”
>>   val jobManagerIpcPort = 6123
>>   configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
>>   configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
>>   private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
>>   private val client = new QueryableStateClient(configuration, highAvailabilityServices)
>>  
>> It results in:
>>  
>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/localhost)]
>>  
>>  
>> Claudio Fahey
>> Chief Solutions Architect, Analytics
>> Dell EMC | Emerging Technologies Team
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State Client with 1.3.0-rc0

Aljoscha Krettek
Hi Claudio,

Quick question: what exactly was your call for getting the local environment with web UI? Did you also have a custom Configuration where you specified, for example, that the queryable state server should be enabled?

I can make an example work where I start a local cluster in one process (in the IDE) and then query from another process (also started in the IDE) but only if I manually start the LocalFlinkMiniCluster, as outlined in my last mail. I’m talking about Flink 1.2.x here.

Best,
Aljoscha

> On 6. Jun 2017, at 17:23, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Claudio,
>
> Quick follow up: querying a locally started cluster does not work out-of-box anymore in Flink 1.3. You can manually start a mini cluster that has the required settings, though. You would do something like this:
>
> Configuration configuration = new Configuration();
> configuration.addAll(jobGraph.getJobConfiguration());
>
> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
> configuration.setString(JobManagerOptions.ADDRESS, "localhost");
> configuration.setInteger(JobManagerOptions.PORT, 6123);
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
>
> flinkMiniCluster = new LocalFlinkMiniCluster(
>    configuration,
>    HighAvailabilityServicesUtils.createHighAvailabilityServices(
>        configuration,
>        Executors.directExecutor(),
>        HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
>    false);
>
> flinkMiniCluster.start();
>
> And then you can create a remote StreamExecutionEnvironment using StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to that cluster.
>
> You can stop the cluster using flinkMiniCluster.stop()
>
> I hope this helps?
>
> Best,
> Aljoscha
>
>> On 6. Jun 2017, at 16:33, Aljoscha Krettek <[hidden email]> wrote:
>>
>> Hi Claudio,
>>
>> The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state. Could you see if that helps? The important bit for you is probably this:
>>
>> final HighAvailabilityServices highAvailabilityServices =
>>      HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>           config,
>>           Executors.newSingleThreadScheduledExecutor(),
>>           HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>>
>> If that doesn’t help we’ll need to delve deeper.
>>
>> Best,
>> Aljoscha
>>
>>> On 11. May 2017, at 22:21, Fahey, Claudio <[hidden email]> wrote:
>>>
>>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter?
>>>
>>> I have tried the following code:
>>>
>>>  val jobManagerIpcAddress = “localhost”
>>>  val jobManagerIpcPort = 6123
>>>  configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
>>>  configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
>>>  private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
>>>  private val client = new QueryableStateClient(configuration, highAvailabilityServices)
>>>
>>> It results in:
>>>
>>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/localhost)]
>>>
>>>
>>> Claudio Fahey
>>> Chief Solutions Architect, Analytics
>>> Dell EMC | Emerging Technologies Team
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State Client with 1.3.0-rc0

Aljoscha Krettek
Sorry for yet another update but this is the complete settings for making it work on Flink 1.3:

Configuration configuration = new Configuration();

configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
configuration.setString(JobManagerOptions.ADDRESS, "localhost");
configuration.setInteger(JobManagerOptions.PORT, 6123);
configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
configuration.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
// needed because queryable state server is always disabled with only one TaskManager
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);

FlinkMiniCluster flinkMiniCluster = new LocalFlinkMiniCluster(
      configuration,
      HighAvailabilityServicesUtils.createHighAvailabilityServices(
            configuration,
            Executors.directExecutor(),
            HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
      false);

flinkMiniCluster.start();

final StreamExecutionEnvironment env =
      StreamExecutionEnvironment.createRemoteEnvironment("localhost", 6123);

> On 7. Jun 2017, at 14:55, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Claudio,
>
> Quick question: what exactly was your call for getting the local environment with web UI? Did you also have a custom Configuration where you specified, for example, that the queryable state server should be enabled?
>
> I can make an example work where I start a local cluster in one process (in the IDE) and then query from another process (also started in the IDE) but only if I manually start the LocalFlinkMiniCluster, as outlined in my last mail. I’m talking about Flink 1.2.x here.
>
> Best,
> Aljoscha
>
>> On 6. Jun 2017, at 17:23, Aljoscha Krettek <[hidden email]> wrote:
>>
>> Hi Claudio,
>>
>> Quick follow up: querying a locally started cluster does not work out-of-box anymore in Flink 1.3. You can manually start a mini cluster that has the required settings, though. You would do something like this:
>>
>> Configuration configuration = new Configuration();
>> configuration.addAll(jobGraph.getJobConfiguration());
>>
>> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
>> configuration.setString(JobManagerOptions.ADDRESS, "localhost");
>> configuration.setInteger(JobManagerOptions.PORT, 6123);
>> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
>>
>> flinkMiniCluster = new LocalFlinkMiniCluster(
>>   configuration,
>>   HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>       configuration,
>>       Executors.directExecutor(),
>>       HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
>>   false);
>>
>> flinkMiniCluster.start();
>>
>> And then you can create a remote StreamExecutionEnvironment using StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to that cluster.
>>
>> You can stop the cluster using flinkMiniCluster.stop()
>>
>> I hope this helps?
>>
>> Best,
>> Aljoscha
>>
>>> On 6. Jun 2017, at 16:33, Aljoscha Krettek <[hidden email]> wrote:
>>>
>>> Hi Claudio,
>>>
>>> The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state. Could you see if that helps? The important bit for you is probably this:
>>>
>>> final HighAvailabilityServices highAvailabilityServices =
>>>     HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>>          config,
>>>          Executors.newSingleThreadScheduledExecutor(),
>>>          HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>>>
>>> If that doesn’t help we’ll need to delve deeper.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>> On 11. May 2017, at 22:21, Fahey, Claudio <[hidden email]> wrote:
>>>>
>>>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter?
>>>>
>>>> I have tried the following code:
>>>>
>>>> val jobManagerIpcAddress = “localhost”
>>>> val jobManagerIpcPort = 6123
>>>> configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
>>>> configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
>>>> private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
>>>> private val client = new QueryableStateClient(configuration, highAvailabilityServices)
>>>>
>>>> It results in:
>>>>
>>>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/localhost)]
>>>>
>>>>
>>>> Claudio Fahey
>>>> Chief Solutions Architect, Analytics
>>>> Dell EMC | Emerging Technologies Team
>>>>
>>>
>>
>