This post was updated on .
Hi,
I'd like to implement a custom Zookeeper data source which reads zookeeper if Node Data Changed. Now it's not working perfectly because the thread needs to sleep otherwise it doesn't work. public static class ZKSource implements SourceFunction<String> { private static final long serialVersionUID = 1L; private static String zkData; private static CuratorFramework client; private static boolean isChanged = false; @Override public void run(SourceContext<String> ctx) throws Exception { // first collect ctx.collect(getZKData()); while (true) { Thread.sleep(1); // without this doesn't work if(isChanged){ ctx.collect(getZKData()); isChanged = false; } } } @Override public void cancel() { client.close(); } private String getZKData() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.getCuratorListenable().addListener(new ZKListener()); client.start(); zkData = new String(client.getData().watched().forPath(node), StandardCharsets.UTF_8); return zkData; } static class ZKListener implements CuratorListener { @Override public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { EventType changeEvent = curatorEvent.getWatchedEvent().getType(); try { switch (changeEvent){ case NodeDataChanged: isChanged = true; break; default: break; } } catch (Exception e){ LOG.warn("Exception", e); client.close(); } } } } I was thinking ctx.collect() needs some time to complete and without Thread.sleep(), ctx.collect() doesn't start and isChanged becomes false quicker than ctx.collect(). Besides, the time needed to sleep may vary based on how busy the Flink cluster is which sounds not very robust? Would be glad to know any better implementation and mistakes I have made. Best, Sendoh |
Free forum by Nabble | Edit this page |