Hi guys, got one more question for you, maybe someone already implemented such feature or found a good technique.
I wrote an IT, that runs a flink job, that reads data from kafka topic, and flushes it onto fs using BucketingSink.
I implemented some custom logic, that fires on notifyCheckpointComplete and would like to test it, so I need to lock job somehow and wait till checkpointing is performed.
The first idea, that I’ve implemented, is to specify checkpointing interval in a 1 second, and extend logic of test source to wait for a few seconds, when all test messages will be send to sink.
The code looks something like this:
public class SleepingCollectionInputFormat<T> extends CollectionInputFormat<T> {
private static final long serialVersionUID = -5957191172818298164L;
private final long duration;
public SleepingCollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer, long duration) {
super(dataSet, serializer);
this.duration = duration;
}
@Override
public boolean reachedEnd() throws IOException {
try {
boolean reached = super.reachedEnd();
if (reached) {
Thread.sleep(duration);
}
return reached;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
This is not good enough, because we couldn’t provide any guarantees, that everything is properly locked and checkpointing is be called.
As for me, much better approach will be to send some kind of notification to task manager, when all items from source were sent, that it’s time to perform checkpointing.
Maybe someone knows, how such feature could be implemented ?
Another one thing, that could be implemented is to use CountDownLatch, count on notify checkpointing and await in the end of source function, but serialization makes it to complicated for me now.
I’ll be very pleasant for your replies, answers and recommendations.
Thx !