I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter.
The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI.
Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter? I have tried the following code: val jobManagerIpcAddress = “localhost” val jobManagerIpcPort = 6123 configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress) configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort) private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress) private val client = new QueryableStateClient(configuration, highAvailabilityServices) It results in: Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/localhost)] Claudio Fahey |
Hi Claudio,
The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state. Could you see if that helps? The important bit for you is probably this: HighAvailabilityServicesUtils.createHighAvailabilityServices( config, Executors.newSingleThreadScheduledExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); If that doesn’t help we’ll need to delve deeper. Best, Aljoscha
On 11. May 2017, at 22:21, Fahey, Claudio <[hidden email]> wrote: |
Hi Claudio,
Quick follow up: querying a locally started cluster does not work out-of-box anymore in Flink 1.3. You can manually start a mini cluster that has the required settings, though. You would do something like this: Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); configuration.setString(JobManagerOptions.ADDRESS, "localhost"); configuration.setInteger(JobManagerOptions.PORT, 6123); conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); flinkMiniCluster = new LocalFlinkMiniCluster( configuration, HighAvailabilityServicesUtils.createHighAvailabilityServices( configuration, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION), false); flinkMiniCluster.start(); And then you can create a remote StreamExecutionEnvironment using StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to that cluster. You can stop the cluster using flinkMiniCluster.stop() I hope this helps? Best, Aljoscha > On 6. Jun 2017, at 16:33, Aljoscha Krettek <[hidden email]> wrote: > > Hi Claudio, > > The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state. Could you see if that helps? The important bit for you is probably this: > > final HighAvailabilityServices highAvailabilityServices = > HighAvailabilityServicesUtils.createHighAvailabilityServices( > config, > Executors.newSingleThreadScheduledExecutor(), > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); > > If that doesn’t help we’ll need to delve deeper. > > Best, > Aljoscha > >> On 11. May 2017, at 22:21, Fahey, Claudio <[hidden email]> wrote: >> >> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter? >> >> I have tried the following code: >> >> val jobManagerIpcAddress = “localhost” >> val jobManagerIpcPort = 6123 >> configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress) >> configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort) >> private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress) >> private val client = new QueryableStateClient(configuration, highAvailabilityServices) >> >> It results in: >> >> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/localhost)] >> >> >> Claudio Fahey >> Chief Solutions Architect, Analytics >> Dell EMC | Emerging Technologies Team >> > |
Hi Claudio,
Quick question: what exactly was your call for getting the local environment with web UI? Did you also have a custom Configuration where you specified, for example, that the queryable state server should be enabled? I can make an example work where I start a local cluster in one process (in the IDE) and then query from another process (also started in the IDE) but only if I manually start the LocalFlinkMiniCluster, as outlined in my last mail. I’m talking about Flink 1.2.x here. Best, Aljoscha > On 6. Jun 2017, at 17:23, Aljoscha Krettek <[hidden email]> wrote: > > Hi Claudio, > > Quick follow up: querying a locally started cluster does not work out-of-box anymore in Flink 1.3. You can manually start a mini cluster that has the required settings, though. You would do something like this: > > Configuration configuration = new Configuration(); > configuration.addAll(jobGraph.getJobConfiguration()); > > configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); > configuration.setString(JobManagerOptions.ADDRESS, "localhost"); > configuration.setInteger(JobManagerOptions.PORT, 6123); > conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); > > flinkMiniCluster = new LocalFlinkMiniCluster( > configuration, > HighAvailabilityServicesUtils.createHighAvailabilityServices( > configuration, > Executors.directExecutor(), > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION), > false); > > flinkMiniCluster.start(); > > And then you can create a remote StreamExecutionEnvironment using StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to that cluster. > > You can stop the cluster using flinkMiniCluster.stop() > > I hope this helps? > > Best, > Aljoscha > >> On 6. Jun 2017, at 16:33, Aljoscha Krettek <[hidden email]> wrote: >> >> Hi Claudio, >> >> The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state. Could you see if that helps? The important bit for you is probably this: >> >> final HighAvailabilityServices highAvailabilityServices = >> HighAvailabilityServicesUtils.createHighAvailabilityServices( >> config, >> Executors.newSingleThreadScheduledExecutor(), >> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); >> >> If that doesn’t help we’ll need to delve deeper. >> >> Best, >> Aljoscha >> >>> On 11. May 2017, at 22:21, Fahey, Claudio <[hidden email]> wrote: >>> >>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter? >>> >>> I have tried the following code: >>> >>> val jobManagerIpcAddress = “localhost” >>> val jobManagerIpcPort = 6123 >>> configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress) >>> configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort) >>> private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress) >>> private val client = new QueryableStateClient(configuration, highAvailabilityServices) >>> >>> It results in: >>> >>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/localhost)] >>> >>> >>> Claudio Fahey >>> Chief Solutions Architect, Analytics >>> Dell EMC | Emerging Technologies Team >>> >> > |
Sorry for yet another update but this is the complete settings for making it work on Flink 1.3:
Configuration configuration = new Configuration(); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); configuration.setString(JobManagerOptions.ADDRESS, "localhost"); configuration.setInteger(JobManagerOptions.PORT, 6123); configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); configuration.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); // needed because queryable state server is always disabled with only one TaskManager configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); FlinkMiniCluster flinkMiniCluster = new LocalFlinkMiniCluster( configuration, HighAvailabilityServicesUtils.createHighAvailabilityServices( configuration, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION), false); flinkMiniCluster.start(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 6123); > On 7. Jun 2017, at 14:55, Aljoscha Krettek <[hidden email]> wrote: > > Hi Claudio, > > Quick question: what exactly was your call for getting the local environment with web UI? Did you also have a custom Configuration where you specified, for example, that the queryable state server should be enabled? > > I can make an example work where I start a local cluster in one process (in the IDE) and then query from another process (also started in the IDE) but only if I manually start the LocalFlinkMiniCluster, as outlined in my last mail. I’m talking about Flink 1.2.x here. > > Best, > Aljoscha > >> On 6. Jun 2017, at 17:23, Aljoscha Krettek <[hidden email]> wrote: >> >> Hi Claudio, >> >> Quick follow up: querying a locally started cluster does not work out-of-box anymore in Flink 1.3. You can manually start a mini cluster that has the required settings, though. You would do something like this: >> >> Configuration configuration = new Configuration(); >> configuration.addAll(jobGraph.getJobConfiguration()); >> >> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); >> configuration.setString(JobManagerOptions.ADDRESS, "localhost"); >> configuration.setInteger(JobManagerOptions.PORT, 6123); >> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); >> >> flinkMiniCluster = new LocalFlinkMiniCluster( >> configuration, >> HighAvailabilityServicesUtils.createHighAvailabilityServices( >> configuration, >> Executors.directExecutor(), >> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION), >> false); >> >> flinkMiniCluster.start(); >> >> And then you can create a remote StreamExecutionEnvironment using StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to that cluster. >> >> You can stop the cluster using flinkMiniCluster.stop() >> >> I hope this helps? >> >> Best, >> Aljoscha >> >>> On 6. Jun 2017, at 16:33, Aljoscha Krettek <[hidden email]> wrote: >>> >>> Hi Claudio, >>> >>> The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state. Could you see if that helps? The important bit for you is probably this: >>> >>> final HighAvailabilityServices highAvailabilityServices = >>> HighAvailabilityServicesUtils.createHighAvailabilityServices( >>> config, >>> Executors.newSingleThreadScheduledExecutor(), >>> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); >>> >>> If that doesn’t help we’ll need to delve deeper. >>> >>> Best, >>> Aljoscha >>> >>>> On 11. May 2017, at 22:21, Fahey, Claudio <[hidden email]> wrote: >>>> >>>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m a bit lost on what exactly I should specify for that parameter. For development, I want to connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter? >>>> >>>> I have tried the following code: >>>> >>>> val jobManagerIpcAddress = “localhost” >>>> val jobManagerIpcPort = 6123 >>>> configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress) >>>> configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort) >>>> private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress) >>>> private val client = new QueryableStateClient(configuration, highAvailabilityServices) >>>> >>>> It results in: >>>> >>>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/localhost)] >>>> >>>> >>>> Claudio Fahey >>>> Chief Solutions Architect, Analytics >>>> Dell EMC | Emerging Technologies Team >>>> >>> >> > |
Free forum by Nabble | Edit this page |