Hi team
I'm subscribing 2 topics from Kafka Consumer, joining them and publishing back to a new topic via KafkaProducer (with Exactly Once semantic)
As it's highly recommended to set uid for each operator, I'm curious how this works. For example,
val topicASource = env
.addSource(topicAConsumer)
.uid("topicAConsumer")
val topicBSource = env
.addSource(topicAConsumer)
.uid("topicAConsumer")
val result = joinstream(env, topicASource, topicBSource)
.uid("transformer")
val topicCSink = result
.addSink(topicCProducer)
.uid("topicCProducer")
in this code, is it necessary to set the UID of the transformer? If the consumer offset is not committed until it finally gets published to sink, will consumers replaying from offset from previous checkpoint guarantee exactly once? even though transformer state is lost when restarting?