Queryable State

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable State

Navneeth Krishnan
Hi All,

I'm running a streaming job on flink 1.3.2 with few queryable states. There are 3 task managers and a job manager. I'm getting timeout exception when trying to query a state and also a warning message in the job manager log. 

Client:
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, jobMgrHost);
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());

final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);

Exception:
Exception in thread "main" io.netty.channel.ConnectTimeoutException: connection timed out: /172.31.18.170:43537
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

Job Manager:
2017-09-10 06:55:41,599 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms. Reason: [Disassociated]

Thanks,
Navneeth


Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Navneeth Krishnan
Hi All,

Any suggestions would really be helpful. Thanks

On Sun, Sep 10, 2017 at 12:04 AM, Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I'm running a streaming job on flink 1.3.2 with few queryable states. There are 3 task managers and a job manager. I'm getting timeout exception when trying to query a state and also a warning message in the job manager log. 

Client:
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, jobMgrHost);
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());

final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);

Exception:
Exception in thread "main" io.netty.channel.ConnectTimeoutException: connection timed out: /172.31.18.170:43537
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

Job Manager:
2017-09-10 06:55:41,599 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms. Reason: [Disassociated]

Thanks,
Navneeth



Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Biplob Biswas
Hi,


are you sure your jobmanager is running and is accessible from the supplied
hostname and port? If you can start up the FLink UI of the job which creates
your queryable state, it should have the details of the job manager and the
port to be used in this queryable client job.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

RE: Queryable State

Marchant, Hayden
I can see the job running in the FlinkUI for the job, and specifically specified the port for the Job Manager. When I provided a different port, I got an akka exception. Here, it seems that the code is getting further. I think that it might be connected with how I am creating the StateDescriptor. What exactly does it mean when the KvStateLocation can't be found?

-----Original Message-----
From: Biplob Biswas [mailto:[hidden email]]
Sent: Wednesday, September 13, 2017 2:20 PM
To: [hidden email]
Subject: Re: Queryable State

Hi,


are you sure your jobmanager is running and is accessible from the supplied hostname and port? If you can start up the FLink UI of the job which creates your queryable state, it should have the details of the job manager and the port to be used in this queryable client job.



--
Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=ox9rY5RgZleCKLmUaw2y4BpSeUaf32AN7o4HRP1gkUQ&s=gZtSvvulOpw2jMACIgulbIacj6bKfndY6B7LdP-jRbg&e= 
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Navneeth Krishnan
In reply to this post by Biplob Biswas
Hi,

I am sure I have provided the right job manager details because the connection timeout ip is the task manager where the state is kept. I guess the client is able to reach the job manager and figure out where the state is. Also if I provide a wrong state name, I'm receiving unknown state exception. I couldn't find why there is a timeout and a warning message is logged in the job manager.

On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas <[hidden email]> wrote:
Hi,


are you sure your jobmanager is running and is accessible from the supplied
hostname and port? If you can start up the FLink UI of the job which creates
your queryable state, it should have the details of the job manager and the
port to be used in this queryable client job.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Navneeth Krishnan
Hi,

Any idea on how to solve this issue?

Thanks

On Wed, Sep 13, 2017 at 10:12 AM, Navneeth Krishnan <[hidden email]> wrote:
Hi,

I am sure I have provided the right job manager details because the connection timeout ip is the task manager where the state is kept. I guess the client is able to reach the job manager and figure out where the state is. Also if I provide a wrong state name, I'm receiving unknown state exception. I couldn't find why there is a timeout and a warning message is logged in the job manager.

On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas <[hidden email]> wrote:
Hi,


are you sure your jobmanager is running and is accessible from the supplied
hostname and port? If you can start up the FLink UI of the job which creates
your queryable state, it should have the details of the job manager and the
port to be used in this queryable client job.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Kostas Kloudas
In reply to this post by Navneeth Krishnan
Hi Navneeth,

If you increase the timeout, everything works ok?
I suppose from your config that you are running in standalone mode, right?

Any other information about the job (e.g. code and/or size of state being fetched) and 
the cluster setup that can help us pin down the problem, would be appreciated.

Thanks,
Kostas

On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan <[hidden email]> wrote:

Hi,

I am sure I have provided the right job manager details because the connection timeout ip is the task manager where the state is kept. I guess the client is able to reach the job manager and figure out where the state is. Also if I provide a wrong state name, I'm receiving unknown state exception. I couldn't find why there is a timeout and a warning message is logged in the job manager.

On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas <[hidden email]> wrote:
Hi,


are you sure your jobmanager is running and is accessible from the supplied
hostname and port? If you can start up the FLink UI of the job which creates
your queryable state, it should have the details of the job manager and the
port to be used in this queryable client job.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Navneeth Krishnan
No, it doesn't work even if I increase the timeout.

The state being fetched is a Map of data and has around 100 entries in it. I have a single job manager and 3 task managers with 16 slots each running on AWS EC2. 

final TypeSerializer<String> keySerializer =
TypeInformation.of(new TypeHint<String>() {}).createSerializer(new ExecutionConfig());
final TypeSerializer<HashMap<String, Object>> valueSerializer =
TypeInformation.of(new TypeHint<HashMap<String, Object>>() {}).createSerializer(new ExecutionConfig());
final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, keySerializer,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
final FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);

On Fri, Sep 15, 2017 at 6:44 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Navneeth,

If you increase the timeout, everything works ok?
I suppose from your config that you are running in standalone mode, right?

Any other information about the job (e.g. code and/or size of state being fetched) and 
the cluster setup that can help us pin down the problem, would be appreciated.

Thanks,
Kostas

On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan <[hidden email]> wrote:

Hi,

I am sure I have provided the right job manager details because the connection timeout ip is the task manager where the state is kept. I guess the client is able to reach the job manager and figure out where the state is. Also if I provide a wrong state name, I'm receiving unknown state exception. I couldn't find why there is a timeout and a warning message is logged in the job manager.

On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas <[hidden email]> wrote:
Hi,


are you sure your jobmanager is running and is accessible from the supplied
hostname and port? If you can start up the FLink UI of the job which creates
your queryable state, it should have the details of the job manager and the
port to be used in this queryable client job.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/