Hi Team,
I have a Flink application reading from Kafka. Each payload is a request sent by a user containing a list of queries. What I would like to do is use Flink to process the queries parallelly and aggregate results and send back to the user. For example, let's say we have two messages in Kafka message 1: "user1 - [query1, query2, query3]" should return as "user1 - [result1, result2, result3]" message 2: "user2 - [query1, query2, query3, query4]" should return as "user2 - [result1, result2, result3, result4]" My idea is to use flatmap to split each query and keyBy user and then aggregate. But how do I know when aggregation is finished? If I use `countWindow` how to pass in queries length as a variable since it's not constant? Thanks. - Li |
Can I get any suggestion? Thanks a lot.
- Li -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Li, From my view I think it would not be eaily use a countWindow if you have different number of records for each key (namely user in this case). I think you may need to user the low level KeyedProcessFunction [1] to keep some state by yourself. For example, each request might also carries the total number of requests of each user, and in the KeyedProcessFunction you might record the received number of requests and total requests of this user in the state. Whenever enough requests is received for each user, it could be known that the message is completely processed and the state of this user could also be cleaned at then. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html#the-keyedprocessfunction ------------------------------------------------------------------ |
Hi Li, what you can do is to add the #queries when splitting: user1 - [query1, query2, query3] -> [user1, query1, 3] [user1, query2, 3] [user1, query2, 3] -> Then while collecting the results, you just compare the current number of records in the window and emit if it reaches the expected number. Make sure that while you split, you also generate a unique key to group the results. So add the same UUID (or so) to [user1, query1, 3] etc., so that you can easily group the results. You probably also need to implement a custom trigger and evictor. Let me know if you have any more questions. On Mon, Jan 11, 2021 at 11:41 AM Yun Gao <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |