Hi,
I have a keyed value state which is available for only about 1% the total number of keyed values that I have. Is there any way to get the values of all those state values? I looked at the queryable state option, but it looks like supporting querying by keyed value only. Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Averell
AFAIK, we can't get all the key-values from value state, but MapState has a function called `entries` can do this, maybe can use MapState as a workaround.
On May 7, 2019, 16:16 +0800, Averell <[hidden email]>, wrote:
Hi, |
Hi, The KeyedBroadcastProcessFunction has a method to iterate over all keys of a keyed state. This function is available via the Context object of the processBroadcast() method. Hence you need a broadcasted message to trigger the operation. Best, Fabian Am Do., 9. Mai 2019 um 08:46 Uhr schrieb Congxian Qiu <[hidden email]>:
|
Thank you Congxian and Fabian.
@Fabian: could you please give a bit more details? My understanding is: to pass the context itself and an OutputTag to the KeyedStateFunction parameter of KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from within that KeyedStateFunction.process() send out the side output. Am I understand your idea correctly? BTW, I have another question regarding KeyedBroadcastProcessFunction best practice: I am having two streams: Data and Toggle. The stream Toggle is just a keyed boolean stream, being used to filter data from the stream Data. And I am implementing that filter using a simple RichCoFlatMapFunction. Now that I want to export the list of keys which are currently toggled on. Should I (1) have one additional KeyedBroadcastProcessFunction operator (which has Toggle and BroadCast as the input streams), or (2) replace that RichCoFlatMapFunction with a new KeyedBroadcastProcessFunction, which has both functionalities: filter and export? Doing this would require unioning Toggle and Data into one single keyed stream. Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Passing a Context through a DataStream definitely does not work. You'd need to have the keyed state that you want to scan over in the KeyedBroadcastProcessFunction. For the toggle filter use case, you would need to have a unioned stream with Toggle and StateReport events. For the output, you can use side outputs to route the different outputs to different streams. Best, Fabian Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <[hidden email]>: Thank you Congxian and Fabian. |
Hi Fabian, Sorry, but I am still confused about your guide. If I union the Toggle stream with the StateReportTrigger stream, would that means I need to make my Toggles broadcasted states? Or there's some way to modify the keyed states from within the processBroadcastElement() method? I tried to implement the other direction (which I briefed in my previous email). It seems working, but I am not confident in that, not sure whether it has any flaws. Could you please give your comment? In my view, this implementation causes a lot of type-casting for my main data stream, which might cause a high impact on performance (my toggle is on in only about 1% of the keys, and the rate of input1.left is less than a millionth comparing to the rate of input1.right) /** Thanks for your help. Regards, Averell On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <[hidden email]> wrote:
|
Hi Averell, Ah, sorry. I had assumed the toggle events where broadcasted anyway. Since you had both streams keyed, your current solution looks fine to me. Best, Fabian Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <[hidden email]>:
|
Hi Fabian, Thanks for that. However, as I mentioned in my previous email, that implementation requires a lot of typecasting/object wrapping. I tried to broadcast that Toggle stream - the toggles will be saved as a MapState, and whenever an export trigger record arrived, I send out that MapState. There's no use of applyToKeyedState in this implementation. And the problem I got is I received duplicated output (one from each parallelism-instance). Is there any option to modify the keyed state from within the processBroadcastElement() method? Thanks a lot for your help. Regards, Averell On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <[hidden email]> wrote:
|
Hi Averell, I'd go with your approach any state access (given that you use RocksDB keyed state) or deduplication of messages is going to be more expensive than a simple cast. Best, Fabian Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <[hidden email]>:
|
Thank you very much, Fabian. Regards, Averell On Fri, May 10, 2019 at 9:46 PM Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |