Need to sleep the thread to let my Flink Zookeeper datasource with NodeDataChanged work

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Need to sleep the thread to let my Flink Zookeeper datasource with NodeDataChanged work

Hung
This post was updated on .
Hi,

I'd like to implement a custom Zookeeper data source which reads zookeeper if Node Data Changed. Now it's not working perfectly because the thread needs to sleep otherwise it doesn't work.

public static class ZKSource implements SourceFunction<String> {
                private static final long serialVersionUID = 1L;
                private static String zkData;
                private static CuratorFramework client;
                private static boolean isChanged = false;

                @Override
                public void run(SourceContext<String> ctx) throws Exception {
                        // first collect
                        ctx.collect(getZKData());
                       while (true) {
                                Thread.sleep(1); // without this doesn't work
                                if(isChanged){
                                        ctx.collect(getZKData());
                                        isChanged = false;
  }
                        }
  }

                 @Override
                public void cancel() {
                         client.close();
  }

  private String getZKData() throws Exception {
  RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

                        client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                       
                        client.getCuratorListenable().addListener(new ZKListener());

                        client.start();
                        zkData = new String(client.getData().watched().forPath(node), StandardCharsets.UTF_8);
                       
                        return zkData;
                }

                static class ZKListener implements CuratorListener {
                        @Override
                        public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws    Exception {
                                EventType changeEvent = curatorEvent.getWatchedEvent().getType();
                                try {
                                        switch (changeEvent){
                                               case NodeDataChanged:
                                                        isChanged = true;
                                                        break;
                                                default:
                                                        break;
                                        }

                                }

                                catch (Exception e){
                                        LOG.warn("Exception", e);
                                        client.close();
                                }
                        }

                }
        }

I was thinking ctx.collect() needs some time to complete and without Thread.sleep(), ctx.collect() doesn't start and isChanged becomes false quicker than ctx.collect(). Besides, the time needed to sleep may vary based on how busy the Flink cluster is which sounds not very robust?
Would be glad to know any better implementation and mistakes I have made.

Best,

Sendoh