Dear Flink experts,
I am testing the following code
env.enableCheckpointing(2000);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("kf-events", new SimpleStringSchema(), properties2);
...
messageStream.rebalance().map(new MapFunction<String, String>() {
...
@Override
public String map(String value) throws Exception {
long tid = Thread.currentThread().getId();
doWork(tid);
...
}
}).setParallelism(2).print();
By using setParallelism(2), I expect to have two threads processing records from the same Kafka partition. However, if one thread is choked in doWork, the other thread can not make progress either. The consumer offset lagging starts to build up as the result.
What I`d like to achieve is that, the 'healthy' thread continues to make progress to some extend (not forever), even though the the choked thread is holding the earlier offset from been committed, but the subsequent records in the partition are processed.
If the offset that the unhealthy thread eventually aborts, it is OK to reprocess offsets again. For example, doWork does credit card fraud checking, I don`t one bad transaction to hold off all customers` credit card usage.
My question is:
1) should doWork be a sync call a bad practice?
2) Is there a parameters that I can tune the max uncommitted offset? This will dictate how much offsets might be reprocessed.
Thanks a lot
Ben