Hi there, In my usecase, I read data from Kafka where in each kafka partition, I have ascending timestamps. Currently, I parse the data from Kafka with a custom deserialization schema so that after parsing, the FlinkKafkaConsumerBase can extract the eventtime ascending timestamps and create proper watermarks via the WatermarkMultiplexer (i.e. take the minimum watermark over all non-idling assigned partitions in that task). Now, we have a strange modification that the parser can change at runtime and only via the new parser, I can extract the timestamp field of the received byte[]. The parser change is told to me via another kafka topic. I immediately thought about: That's a perfect usecase for broadcast streams: I connect the parser config stream and buffer the events in the connect function if the old parser is not able to parse the events up until a new parser arrives. (Doesn't sound too good from architecture all in all, but that's how it is). My problem is the following: If I want to use broadcast stream, I must outsource my parser to a new pipeline step and don't parse within the KafkaDeserializationSchema any longer. This also means that Flink/Kafka can't produce the watermarks and I need to emulate the nice per partition ascending watermark assigner with the downstream multiplexer myself. Am I correct? Can I "easily" plugin to my stream (after broadcast parsing) this timestamp assigner with multiplexer logic? Could it also detect idle partitions like the KafkaConsumer? Or which way would you go? The only alternative I see is to greatly incrase the complexity of my KafkaDeserializationSchema to also read another kafka topic in background and as well buffer elements internally.. Sounds not very "flinkish". Best regards Theo |
Hi Theo,
this is indeed a difficult use case. The KafkaDeserializationSchema is actually meant mostly for deserialization and should not contain more complex logic such as joining with a different topic. You would make KafkaDeserializationSchema stateful. But in your usecase, I see no better alternative than making KafkaDeserializationSchema more complex if per-partition watermarking should be in place. Are the parser changes happen frequently? And how do you deal with a failure case when the Flink job restarts and all state in the deserialization schema is lost? You might need to extend the FlinkKafkaConsumer to add another parser state for persistence. Regards, Timo On 21.10.20 17:25, Theo Diefenthal wrote: > Hi there, > > In my usecase, I read data from Kafka where in each kafka partition, I > have ascending timestamps. > Currently, I parse the data from Kafka with a custom deserialization > schema so that after parsing, the FlinkKafkaConsumerBase can extract the > eventtime ascending timestamps and create proper watermarks via the > WatermarkMultiplexer (i.e. take the minimum watermark over all > non-idling assigned partitions in that task). > > Now, we have a strange modification that the parser can change at > runtime and only via the new parser, I can extract the timestamp field > of the received byte[]. The parser change is told to me via another > kafka topic. > I immediately thought about: That's a perfect usecase for broadcast > streams: I connect the parser config stream and buffer the events in the > connect function if the old parser is not able to parse the events up > until a new parser arrives. (Doesn't sound too good from architecture > all in all, but that's how it is). > > My problem is the following: If I want to use broadcast stream, I must > outsource my parser to a new pipeline step and don't parse within the > KafkaDeserializationSchema any longer. This also means that Flink/Kafka > can't produce the watermarks and I need to emulate the nice per > partition ascending watermark assigner with the downstream multiplexer > myself. Am I correct? Can I "easily" plugin to my stream (after > broadcast parsing) this timestamp assigner with multiplexer logic? Could > it also detect idle partitions like the KafkaConsumer? Or which way > would you go? The only alternative I see is to greatly incrase the > complexity of my KafkaDeserializationSchema to also read another kafka > topic in background and as well buffer elements internally.. Sounds not > very "flinkish". > > Best regards > Theo |
Free forum by Nabble | Edit this page |