Hi, We have a keyed pipeline with persisted state. Is there a way to broadcast a command and collect all values that persisted in the state ? The end result can be for example sending a fetch command to all operators and emitting the results to some sink why do we need it ? from time to time we might want to check if we are missing keys what are the additional keys or simply emit the current state to a table and to query it. I tried simply broadcasting a command and addressing the persisted state but that resulted with: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context. is there a good way to achieve that ? Cheers Avi |
Hi Avi,
did you have a look at the .connect() and .broadcast() API functionalities? They allow you to broadcast a control stream to all operators. Maybe this example [1] or other examples in this repository can help you. Regards, Timo [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java Am 26.04.19 um 07:57 schrieb Avi Levi: > Hi, > We have a keyed pipeline with persisted state. > Is there a way to broadcast a command and collect all values that > persisted in the state ? > > The end result can be for example sending a fetch command to all > operators and emitting the results to some sink > > why do we need it ? from time to time we might want to check if we are > missing keys what are the additional keys or simply emit the current > state to a table and to query it. > > I tried simply broadcasting a command and addressing the persisted > state but that resulted with: > java.lang.NullPointerException: No key set. This method should not be > called outside of a keyed context. > > is there a good way to achieve that ? > > Cheers > Avi |
Hi Timo, I defiantly did. but broadcasting a command and trying to address the persisted state (I mean the state of the data stream and not the broadcasted one) you get the exception that I wrote (java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context). e.g doing something like override def processBroadcastElement(value: BroadcastRequest, ctx: KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, Response]#Context, out: Collector[Response]): Unit = { will yield that exception BR Avi On Fri, Apr 26, 2019 at 11:55 AM Timo Walther <[hidden email]> wrote:
|
Hi Avi, I'm not sure if you cannot emit data from the keyed state when you receive a broadcasted message. The Context parameter of the processBroadcastElement() method in the KeyedBroadcastProcessFunction has the applyToKeyedState() method. The method takes a KeyedStateFunction that is applied to each key of a state, but does not provide a Collector to emit data. Maybe you can pass the collector to the KeyedStateFunction and emit records while it iterates over the key space. Best, Fabian Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi <[hidden email]>:
|
Thanks! Works like a charm :) On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske <[hidden email]> wrote:
|
Nice! Thanks for the confirmation :-) Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi <[hidden email]>:
|
Hi Avi: Can you please elaborate (or include an example/code snippet) of how you were able to achieve collecting the keyed states from the processBroadcastElement method using the applyToKeyedState ? I am trying to understand which collector you used to emit the state since the broadcasted elements/state might be different from the non-broadcast elements/state. Thanks for your help. Mans
On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske <[hidden email]> wrote:
Nice! Thanks for the confirmation :-) Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi <[hidden email]>:
|
Sure! you get the context and the collector in the processBroadcastElement method see snippet below override def processBroadcastElement(value: BroadcastRequest, ctx: KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, String]#Context, out: Collector[String]): Unit = { On Mon, Apr 29, 2019 at 5:45 PM M Singh <[hidden email]> wrote:
|
Thanks Avi for your help. Mans
On Tuesday, April 30, 2019, 5:57:51 AM EDT, Avi Levi <[hidden email]> wrote:
Sure! you get the context and the collector in the processBroadcastElement method see snippet below override def processBroadcastElement(value: BroadcastRequest, ctx: KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, String]#Context, out: Collector[String]): Unit = { override def process(key: String, state: ValueState[String]): Unit = Option(state.value()).foreach(s => out.collect(s))... } On Mon, Apr 29, 2019 at 5:45 PM M Singh <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |