Some questions regarding operator IDs

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Some questions regarding operator IDs

Kevin Kwon
Hi team

I'm subscribing 2 topics from Kafka Consumer, joining them and publishing back to a new topic via KafkaProducer (with Exactly Once semantic)

As it's highly recommended to set uid for each operator, I'm curious how this works. For example,

val topicASource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val topicBSource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val result = joinstream(env, topicASource, topicBSource)
  .uid("transformer")

val topicCSink = result
  .addSink(topicCProducer)
  .uid("topicCProducer")


in this code, is it necessary to set the UID of the transformer? If the consumer offset is not committed until it finally gets published to sink, will consumers replaying from offset from previous checkpoint guarantee exactly once? even though transformer state is lost when restarting?
Reply | Threaded
Open this post in threaded view
|

Re: Some questions regarding operator IDs

rmetzger0
Hey Kevin,

setting the uid is not needed for exactly-once guarantees. It is used if you want to restore the operator state manually using a savepoint.

This blog blog post (there are probably a lot more explaining this) could be helpful to understand how the checkpointing ensures exactly once despite failures: <a href="https://www.ververica.com/blog/how-apache-flink-manages-kafka-consumer-offsets#:~:text=The%20Kafka%20consumer%20in%20Apache,offsets%20in%20all%20Kafka%20partitions.&amp;text=Flink&#39;s%20checkpoint%20mechanism%20ensures%20that,on%20the%20same%20input%20data">https://www.ververica.com/blog/how-apache-flink-manages-kafka-consumer-offsets#:~:text=The%20Kafka%20consumer%20in%20Apache,offsets%20in%20all%20Kafka%20partitions.&text=Flink's%20checkpoint%20mechanism%20ensures%20that,on%20the%20same%20input%20data.


On Tue, Oct 20, 2020 at 10:28 PM Kevin Kwon <[hidden email]> wrote:
Hi team

I'm subscribing 2 topics from Kafka Consumer, joining them and publishing back to a new topic via KafkaProducer (with Exactly Once semantic)

As it's highly recommended to set uid for each operator, I'm curious how this works. For example,

val topicASource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val topicBSource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val result = joinstream(env, topicASource, topicBSource)
  .uid("transformer")

val topicCSink = result
  .addSink(topicCProducer)
  .uid("topicCProducer")


in this code, is it necessary to set the UID of the transformer? If the consumer offset is not committed until it finally gets published to sink, will consumers replaying from offset from previous checkpoint guarantee exactly once? even though transformer state is lost when restarting?