Re: Flink Kafka consumer with low latency requirement

Posted by xwang355 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-Kafka-consumer-with-low-latency-requirement-tp28368p28399.html

      env.enableCheckpointing(2000);
      FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("kf-events", new SimpleStringSchema(), properties2);
                ...
                messageStream.rebalance().map(new MapFunction<String, String>() {
                       ...
                        @Override
                        public String map(String value) throws Exception {
                                long tid = Thread.currentThread().getId();
                                doWork(tid);
                        }

                }).setParallelism(2).print();
...

// The thread with the lowest tid sleeps 60s when counter reaches 1000
private static void doWork(long tid) throws InterruptedException
{
               
  if (!sortedTid.contains(tid)) {
     sortedTid.add(tid);
  }

   // simulate a straggler, make the thread with the lowest tid a slow processor
   if (sortedTid.first() == tid) {
       if (counter++ == 1000){
          Thread.sleep(60,000);
        }

        Thread.sleep(20);
    } else {
        Thread.sleep(20);
                       
    }
}

By using setParallelism(2), I expect to have two threads processing records from the same Kafka partition. However, if one thread is choked in doWork, the other thread can not make progress either. The consumer offset lagging starts to build up as the result.

What I`d like to achieve is that, the 'healthy' thread continues to make progress to some extend (not forever), even though the the choked thread is holding the earlier offset from been committed, but the subsequent records in the partition are processed.

If the offset that the unhealthy thread eventually aborts, it is OK to reprocess  offsets again. For example, doWork does credit card fraud checking, I don`t one bad transaction to hold off all customers` credit card usage.

--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/