Does making synchronize call might choke the whole pipeline

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Does making synchronize call might choke the whole pipeline

xwang355

Dear Flink experts,

I am testing the following code 

        env.enableCheckpointing(2000);

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("kf-events", new SimpleStringSchema(), properties2);

                ...


                messageStream.rebalance().map(new MapFunction<String, String>() {

                       ...


                        @Override

                        public String map(String value) throws Exception {

                              

                                long tid = Thread.currentThread().getId();

                                 doWork(tid);

                               ...

                        }

                }).setParallelism(2).print();

By using 
setParallelism(2), I expect to have two threads processing records from the same Kafka partition. However, if one thread is choked in doWork, the other thread can not make progress either. The consumer offset lagging starts to build up as the result.

What I`d like to achieve is that, the 'healthy' thread continues to make progress to some extend (not forever), even though the the choked thread is holding the earlier offset from been committed, but the subsequent records in the partition are processed. 

If the offset that the unhealthy thread eventually aborts, it is OK to reprocess  offsets again. For example, doWork does credit card fraud checking, I don`t one bad transaction to hold off all customers` credit card usage.

My question is: 
1) should doWork be a sync call a bad practice?
2) Is there a parameters that I can tune the max uncommitted offset? This will dictate how much offsets might be reprocessed.

Thanks a lot
Ben