Hi, I have a flink pipeline which reads from a kafka topic does a map operation(builds an ElasticSearch model) and sinks it to Elasticsearch Pipeline-1:
Now i want some messages to be prioritized(processed quickly not necessarily in any order). I am okay in creating a new topic and placing the priority messages in it (or) do a partition based buckets(Ex: https://github.com/riferrei/bucket-priority-pattern i don't think it's possible in flink kafka connector since partition assignment is present inside FlinkKafkaConsumerBase ). I tried the below solution: I created another topic (topic2 in which i placed the priority messages) and with it a new Flink pipeline Pipeline-2:
But the problem is, I want to consume topic2 as soon as possible. I can have a delay/slowness in topic1 because of that. If there is no message in topic2 then topic1 should be given more priority. But in the above case both the pipelines are getting processed equally. Increasing the parallelism of pipeline-2 to a big number doesn't help as when there is no message in topic2 then topic1 is still very slow(parallelism of topic 2 is wasted). How can i achieve this using Flink Kafka connector? Is it possible to achieve it in any other way? Regards, Vignesh |
Hi Vignesh, I'm adding Aljoscha to the thread, he might have an idea how to solve this with the existing Flink APIs (the closest idea I had was the N-ary stream operator, but I guess that doesn't support backpressuring individual upstream operators -- side inputs would be needed for that?) The only somewhat feasible idea I came up with, which only makes sense if you don't need any exactly once guarantees, is implementing your own Kafka connector (or forking the existing Kafka connector in Flink (then you could also get exactly once)). In this custom Kafka connector, you could, conceptually have two Kafka consumers each feeding messages into their bounded queue. A third thread is always emptying the messages from the queue with priority. Best, Robert On Thu, Oct 29, 2020 at 1:29 PM Vignesh Ramesh <[hidden email]> wrote:
|
I'm afraid there's nothing in Flink that would make this possible right now.
Have you thought about if this would be possible by using the vanilla Kafka Consumer APIs? I'm not sure that it's possible to read messages with prioritization using their APIs. Best, Aljoscha On 04.11.20 08:34, Robert Metzger wrote: > Hi Vignesh, > > I'm adding Aljoscha to the thread, he might have an idea how to solve this > with the existing Flink APIs (the closest idea I had was the N-ary stream > operator, but I guess that doesn't support backpressuring individual > upstream operators -- side inputs would be needed for that?) > > The only somewhat feasible idea I came up with, which only makes sense if > you don't need any exactly once guarantees, is implementing your own Kafka > connector (or forking the existing Kafka connector in Flink (then you could > also get exactly once)). > In this custom Kafka connector, you could, conceptually have two Kafka > consumers each feeding messages into their bounded queue. A third thread is > always emptying the messages from the queue with priority. > > Best, > Robert > > > On Thu, Oct 29, 2020 at 1:29 PM Vignesh Ramesh <[hidden email]> > wrote: > >> Hi, >> >> I have a flink pipeline which reads from a kafka topic does a map >> operation(builds an ElasticSearch model) and sinks it to Elasticsearch >> >> *Pipeline-1:* >> >> Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism >> 8) -> Flink-Es-connector-Sink(es1) (parallelism 8) >> >> Now i want some messages to be prioritized(processed quickly not >> necessarily in any order). I am okay in creating a new topic and placing >> the priority messages in it (or) do a partition based buckets(Ex: >> https://github.com/riferrei/bucket-priority-pattern i don't think it's >> possible in flink kafka connector since partition assignment is present >> inside FlinkKafkaConsumerBase ). >> >> *I tried the below solution:* >> >> I created another topic (topic2 in which i placed the priority messages) >> and with it a new Flink pipeline >> >> *Pipeline-2:* >> >> Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism >> 8) -> Flink-Es-connector-Sink(es1) (parallelism 8) >> >> But the problem is, I want to consume topic2 as soon as possible. I can >> have a delay/slowness in topic1 because of that. If there is no message in >> topic2 then topic1 should be given more priority. But in the above case >> both the pipelines are getting processed equally. Increasing the >> parallelism of pipeline-2 to a big number doesn't help as when there is no >> message in topic2 then topic1 is still very slow(parallelism of topic 2 is >> wasted). >> >> How can i achieve this using Flink Kafka connector? Is it possible to >> achieve it in any other way? >> >> >> Regards, >> >> Vignesh >> > |
Free forum by Nabble | Edit this page |