How to write a Sink that needs to deal with conditions in thread-safe way?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to write a Sink that needs to deal with conditions in thread-safe way?

Hussein Baghdadi
Hello,

I have some basic questions regarding Sinks in Flink.

1) To implement our own Sink, which class to implement: RichSinkFunction, RichOutputFormat, etc ..

2) We are trying to write batches in our Sink. For that, in overrided invoke() , we are calling a synchronised function:

// events = new ConcurrentLinkedQueue<>();

   @Override
   public void invoke(T value) throws Exception {
       this.addEventList(value);
   }

   private synchronized void addEventList(T event) {
       events.add(event);
       if ((new Date()).getTime() >= this.nextFlush.get() || events.size() > this.maxBatchSize) {
           Response response = null;
           try {
               response = nakadiClient.resources().events().send(eventName, events);
           } catch (final ClientException | IllegalStateException e) {
               logger.error("Error while sending to Nakadi. Error: {}", e.getMessage());
               throw new RuntimeException(e);
           } finally {
               if (response != null) {
                   try {
                       response.responseBody().close();
                       events = new ConcurrentLinkedQueue<>();
                       this.nextFlush.set((new Date()).getTime() + this.millisecondsWait);
                   } catch (final Exception ex) {
                       logger.error("Error happened while trying to close response. {}", ex);
                   }
               }
           }
       }
   }
Reply | Threaded
Open this post in threaded view
|

Re: How to write a Sink that needs to deal with conditions in thread-safe way?

Stephan Ewen
Hi!

(1) RichSinkFunction is the best function for streaming sinks.

(2) The "invoke()" method is never called by multiple threads concurrently. No need to synchronize.

Stephan


On Thu, Mar 2, 2017 at 4:53 PM, Hussein Baghdadi <[hidden email]> wrote:
Hello,

I have some basic questions regarding Sinks in Flink.

1) To implement our own Sink, which class to implement: RichSinkFunction, RichOutputFormat, etc ..

2) We are trying to write batches in our Sink. For that, in overrided invoke() , we are calling a synchronised function:

// events = new ConcurrentLinkedQueue<>();

   @Override
   public void invoke(T value) throws Exception {
       this.addEventList(value);
   }

   private synchronized void addEventList(T event) {
       events.add(event);
       if ((new Date()).getTime() >= this.nextFlush.get() || events.size() > this.maxBatchSize) {
           Response response = null;
           try {
               response = nakadiClient.resources().events().send(eventName, events);
           } catch (final ClientException | IllegalStateException e) {
               logger.error("Error while sending to Nakadi. Error: {}", e.getMessage());
               throw new RuntimeException(e);
           } finally {
               if (response != null) {
                   try {
                       response.responseBody().close();
                       events = new ConcurrentLinkedQueue<>();
                       this.nextFlush.set((new Date()).getTime() + this.millisecondsWait);
                   } catch (final Exception ex) {
                       logger.error("Error happened while trying to close response. {}", ex);
                   }
               }
           }
       }
   }