Hi Folks,
We are using Flink to capture various interactions of a customer with ECommerce store i.e. product views, orders created. We run 24 hour sliding window 5 minutes apart which makes 288 parallel windows for a single Tenant. We implement Fold Method that has various hashmaps to update the statistics of customers from the incoming Ecommerce event one by one. As soon as the event arrives, the fold method updates the statistics in hashmaps. Considering 1000 Tenants, we have two solutions in mind: !) Implement a flink job per tenant. So 1000 tenants would create 1000 flink jobs 2) Implement a single flink with keyBy 'tenant' so that each tenant gets a separate window. But this will end up in creating 1000 * 288 number of windows in 24 hour period. This would cause extra load on single flink job. What is recommended approach to handle multitenancy in flink at such a big scale with over 1000 tenants while storing the fold state for each event. Solution I would require significant effort to keep track of 1000 flink jobs and provide resilience. Thanks. Best Regards, |
Hi Ahmad, Some tricks that might help to bring down the effort per tenant if you run one job per tenant (or key per tenant): - Pre-aggregate records in a 5 minute Tumbling window. However, pre-aggregation does not work for FoldFunctions. -
Implement the window as a custom ProcessFunction that maintains a state
of 288 events and aggregates and retracts the pre-aggregated records. Best, Fabian 2018-07-03 15:22 GMT+02:00 Ahmad Hassan <[hidden email]>:
|
Hi Fabian, One job per tenant model soon becomes hard to maintain. For example 1000 tenants would require 1000 Flink and providing HA and resilience for 1000 jobs is not so trivial solution. This is why we are hoping to get single flink job handling all the tenants through keyby tenant. However this also does not scale with growing number of tenants and putting all load on single Flink job. So I was wondering how other users are handling multitenancy in flink at scale. Best Regards, On Wed, 4 Jul 2018 at 11:40, Fabian Hueske <[hidden email]> wrote:
|
Would it be feasible for you to
partition your tenants across jobs, like for example 100 customers
per job?
On 04.07.2018 12:53, Ahmad Hassan wrote:
|
HI Chesnay, Yes this is something we would eventually be doing and then maintaining the configuration of which tenants are mapped to which flink jobs. This would reduce the number of flinks jobs to maintain in order to support 1000s of tenants in our use case . Thanks. On Wed, 4 Jul 2018 at 12:00, Chesnay Schepler <[hidden email]> wrote:
|
In reply to this post by Fabian Hueske-2
Hi Fabian, > On 4 Jul 2018, at 11:39, Fabian Hueske <[hidden email]> wrote: > > - Pre-aggregate records in a 5 minute Tumbling window. However, pre-aggregation does not work for FoldFunctions. > - Implement the window as a custom ProcessFunction that maintains a state of 288 events and aggregates and retracts the pre-aggregated records. > > Best, Fabian We are finally implementing processFunction to replace Flink Sliding Window. Please can you elaborate how can we implement the sliding window as processfunction like you explained above. I am struggling to understand how will I keep track of what events belong to which window. We have 24hr running sliding window with 5 min slide (288 windows). How do I emulate 288 windows in processfunction with 5 min slide? 288 sliding windows cause flink checkpoints to hang and never finish even in an hour even with MapState RocksDB. So we decide to get rid of sliding window and use process function to implement sliding window logic. Best, |
Hi Ahmad, First of all, you need to preaggregate the data in a 5 minute tumbling window. For example, if your aggregation function is count or sum, this is simple. You have a 5 min tumbling window that just emits a count or sum every 5 minutes. The ProcessFunction then has a MapState<Integer, IntermediateAgg> (called buffer). IntermediateAgg is the result type of the tumbling window and the MapState is used like an array with the Integer key being the position pointer to the value. You will only use the pointers 0 to 287 to store the 288 intermediate aggregation values and use the MapState as a ring buffer. For that you need a ValueState<Integer> (called pointer) that is a pointer to the position that is overwritten next. Finally, you have a ValueState<Result> (called result) that stores the result of the last window. When the ProcessFunction receives a new intermediate result, it will perform the following steps: 1) get the oldest intermediate result: buffer.get(pointer) 2) override the oldest intermediate result by the newly received intermediate result: buffer.put(pointer, new-intermediate-result) 3) increment the pointer by 1 and reset it to 0 if it became 288 4) subtract the oldest intermediate result from the result 5) add the newly received intermediate result to the result. Update the result state and emit the result Note, this only works for certain aggregation functions. Depending on the function, you cannot pre-aggregate which is a hard requirement for this approach. Best, Fabian Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan <[hidden email]>:
|
Hi Fabian, Thanks for this detail. However, our pipeline is keeping track of list of products seen in 24 hour with 5 min slide (288 windows). inStream .filter(Objects::nonNull) .keyBy(TENANT) .window(SlidingProcessingTimeWindows.of(Time.minutes(24), Time.minutes(5))) .trigger(TimeTrigger.create()) .evictor(CountEvictor.of(1)) .process(new MetricProcessWindowFunction()); Trigger just fires for onElement and MetricProcessWindowFunction just store stats for each product within MapState and emit only if it reaches expiry. Evictor just empty the window as all products state is within MapState. Flink 1.7.0 checkpointing just hangs and expires while processing our pipeline. However, with your proposed solution, how would we be able to achieve this sliding window mechanism of emitting 24 hour window every 5 minute using processfunction ? Best, On Fri, 2 Aug 2019 at 09:48, Fabian Hueske <[hidden email]> wrote:
|
Ok, I won't go into the implementation detail. The idea is to track all products that were observed in the last five minutes (i.e., unique product ids) in a five minute tumbling window. Every five minutes, the observed products are send to a process function that collects the data of the last 24 hours and updates the current result by adding the data of the latest 5 minutes and removing the data of the 5 minutes that fell out of the 24 hour window. I don't know your exact business logic, but this is the rough scheme that I would follow. Cheers, Fabian Am Fr., 2. Aug. 2019 um 12:25 Uhr schrieb Ahmad Hassan <[hidden email]>:
|
Hi Fabian, Thank you, We will look into it now. Best, On Fri, 2 Aug 2019 at 12:50, Fabian Hueske <[hidden email]> wrote:
|
In reply to this post by Fabian Hueske-2
Hi Fabian,
In this case, how do we emit tumbling window when there are no events? Otherwise it’s not possible to emulate a sliding window in process function and move the buffer ring every 5 mins when there are no events. Yes I can create a periodic source function but how can it be associated with all the keyed windows.
Thanks. Best,
|
Hi Ahmad, The ProcessFunction should not rely on new records to come (i..e, do the processsing in the onElement() method) but rather register a timer every 5 minutes and perform the processing when the timer fires in onTimer(). Essentially, you'd only collect data the data in `processElement()` and process in `onTimer()`. You need to make sure that you have timers registered, as long as there's data in the ring buffer. Best, Fabian Am Do., 15. Aug. 2019 um 19:20 Uhr schrieb Ahmad Hassan <[hidden email]>:
|
Thank you Fabian. This works really well. Best Regards, On Fri, 16 Aug 2019 at 09:22, Fabian Hueske <[hidden email]> wrote:
|
Great! Thanks for the feedback. Cheers, Fabian Am Mo., 19. Aug. 2019 um 17:12 Uhr schrieb Ahmad Hassan <[hidden email]>:
|
Flink's sliding window didn't work well for our use case at SAP as the checkpointing freezes with 288 sliding windows per tenant. Implementing sliding window through tumbling window / process function reduces the checkpointing time to few seconds. We will see how that scales with 1000 or more tenants to get the better idea about scalability. Best Regards, On Mon, 19 Aug 2019 at 16:16, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |