Hello,
I have some basic questions regarding Sinks in Flink. 1) To implement our own Sink, which class to implement: RichSinkFunction, RichOutputFormat, etc .. 2) We are trying to write batches in our Sink. For that, in overrided invoke() , we are calling a synchronised function: // events = new ConcurrentLinkedQueue<>(); @Override public void invoke(T value) throws Exception { this.addEventList(value); } private synchronized void addEventList(T event) { events.add(event); if ((new Date()).getTime() >= this.nextFlush.get() || events.size() > this.maxBatchSize) { Response response = null; try { response = nakadiClient.resources().events().send(eventName, events); } catch (final ClientException | IllegalStateException e) { logger.error("Error while sending to Nakadi. Error: {}", e.getMessage()); throw new RuntimeException(e); } finally { if (response != null) { try { response.responseBody().close(); events = new ConcurrentLinkedQueue<>(); this.nextFlush.set((new Date()).getTime() + this.millisecondsWait); } catch (final Exception ex) { logger.error("Error happened while trying to close response. {}", ex); } } } } } |
Hi!
(1) RichSinkFunction is the best function for streaming sinks. (2) The "invoke()" method is never called by multiple threads concurrently. No need to synchronize. Stephan On Thu, Mar 2, 2017 at 4:53 PM, Hussein Baghdadi <[hidden email]> wrote: Hello, |
Free forum by Nabble | Edit this page |