I'm having a problem with querying state on Flink 1.6. I put a project in Github that is my best representation of the very simple client example outlined in the 'querying state' section of the 1.6 documentation at https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html . The Github project is at https://github.com/jolson787/qs My problem: I know the query server and proxy server have started on my 1 job manager / 1 task manager Flink 1.6 test rig, because I see the 'Started Queryable State Server' and
'Started Queryable State Proxy Server' in the task manager logs. I know the ports are open on the local machine, because I can telnet to them. From a remote machine, I implemented the QueryableStateClient as in the example, and made a getKVState call. Nothing I seem to do between that or the getKVstate call seems to register...no response, no errors thrown, no lines in the log, no returned futures, no timeouts, etc. I know the proxy server and state server ports are NOT open to the remote machine, yet the client still doesn't seem to react. Can someone take a quick look at my very simple Github project and see if anything jumps out at them? Beer is on me at Flink Forward if someone can help me work through this.... |
Hi Joe, it looks as if the queryable state server binds to the local loopback address. This looks like a bug to me. Could you maybe share the complete cluster entrypoint and the task manager logs with me? In the meantime you could try to do the following: Change AbstractServerBase.java:227 into `.localAddress(port)`. This should bind to any local address. Now you need to build your own Flink distribution by running `mvn clean package -DskipTests` and then go to either build-target or flink-dist/target/flink-1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT to find the distribution. Cheers, Till On Thu, Aug 30, 2018 at 12:12 AM Joe Olson <[hidden email]> wrote:
|
I took a brief look as to why the queryable state server would bind to the loopback address. Both the qs server and the org.apache.flink.runtime.io.network.netty.NettyServer do bind the local address based on the TM address. That address is based on the "taskmanager.hostname" configuration override and, by default, the RpcService address. A possible explanation is that, on Joe's machine, Java's `InetAddress.getLocalHost()` resolves to the loopback address. I believe there's some variation in Java's behavior in that regard. Hope this helps! On Thu, Aug 30, 2018 at 1:27 AM Till Rohrmann <[hidden email]> wrote:
|
Hi Joe,
Did the problem get resolved at the end? Thanks, Kostas
|
Kostas - Till's advice got me past my first problem. I'm still having issues with the client side. I've got your example code from [1] in a github project [2]. My problem differs from David Anderson's above in that my call to QueryableStateClient is using a remote machine, not localhost (my client is running on a different machines than any of the Flink processes) Assuming a queryable state client is allowed to run on a different machine, I haven't been able to get QueryableStateClient or getKVState to react at all...no errors, even if I put in a bogus IP address, bogus port, etc. On Mon, Sep 10, 2018 at 7:13 AM Kostas Kloudas <[hidden email]> wrote:
|
Hi Joe, what is the current problem you are facing? Cheers, Till On Wed, Sep 12, 2018 at 12:18 AM Joe Olson <[hidden email]> wrote:
|
Hi Joe,
And it would help a lot if you could share a bit more details about your setup
and the code of your job or a minimal example that can reproduce it. Thanks, Kostas
|
Kostas - Thanks for the help. I've got a project in Github [1], which is basically the Flink 1.6 QueryableStateClient example from [2]. Trying to make a simple Flink 1.6 getKVState call to a QueryableStateClient that was initialized with a IP address on another machine results in nothing happening. No future returned, no timeout error, no ' host does not exist' error, nothing.... Debug log (included in README.md in [1]) for a very trivial call to getKVState shows that Netty is bound to loopback, even though the QueryableStateClient was initiated with a remote IP address. I've also included the dump of InetAddress.getLocalHost() in the debug log, per Eron's suggestion. That is resolving to the proper machine name / IP address. I'm not sure why Netty is bound to loopback. This explains why David Anderson's case does work: the
QueryableStateClient
and Flink Server(s) are on the same machine. On Wed, Sep 12, 2018 at 4:19 AM Kostas Kloudas <[hidden email]> wrote:
|
Hi Joe, the code [1] looks as if you don't properly wait on the completion of the future. That way, the client has no chance to build up the connection to the server. Try to call resultFuture.get() which will wait for the result. Cheers, Till On Fri, Sep 14, 2018 at 5:25 PM Joe Olson <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |