Initializing broadcast state

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Initializing broadcast state

Nick Bendtner
Hi guys,
What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.


Best,
Nick.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Guowei Ma
Hi, Nick
  You might need to handle it yourself If you have to process an element only after you get the broadcast state.
  For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before.

Best,
Guowei


On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[hidden email]> wrote:
Hi guys,
What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.


Best,
Nick.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Nick Bendtner
Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example 


Best,
Nick 
On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <[hidden email]> wrote:
Hi, Nick
  You might need to handle it yourself If you have to process an element only after you get the broadcast state.
  For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before.

Best,
Guowei


On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[hidden email]> wrote:
Hi guys,
What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.


Best,
Nick.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Guowei Ma
Hi,Nick
Normally you could not iterate all the keyed states, but the `BroadCastState` & `applyTokeyedState` could do that. 
For example, before you get the broadcast side elements you might choose to cache the non-broadcast element to the keyed state. After the broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate all the elements you "cached" in the keyed state and do your business logic.


Best,
Guowei


On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <[hidden email]> wrote:
Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example 


Best,
Nick 
On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <[hidden email]> wrote:
Hi, Nick
  You might need to handle it yourself If you have to process an element only after you get the broadcast state.
  For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before.

Best,
Guowei


On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[hidden email]> wrote:
Hi guys,
What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.


Best,
Nick.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Nick Bendtner
Hi Guowei,
I am not using a keyed broadcast function, I use [1].  My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in processBroadcastElement. This way the state updates are consistent since each instance of the task gets the same broadcast element.

```
private MapState<String, MyState> myState;
 
@Override
   public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
     // Iterate over map state.
       myState.iterator().forEach(entry -> ())// Business logic
 
       // Do things
   }
 
   @Override
   public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
     // update map state which is not a broadcast state. Same update in every sub operator
       state.put(value.ID(), value.state());   // Update the mapState with value from broadcast
   }
 
 
   @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
     // called when it's time to save state
 
     myState.clear();
 
         // Update myState with current application state
 
 }
 
 @Override
 public void initializeState(FunctionInitializationContext context) throws Exception {
 
     // called when things start up, possibly recovering from an error
 
     descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
 
     myState = context.getKeyedStateStore().getMapState(descriptor);
 
     if (context.isRestored()) {
 
         // restore application state from myState  
 
     }



Best,
Nick.

On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <[hidden email]> wrote:
Hi,Nick
Normally you could not iterate all the keyed states, but the `BroadCastState` & `applyTokeyedState` could do that. 
For example, before you get the broadcast side elements you might choose to cache the non-broadcast element to the keyed state. After the broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate all the elements you "cached" in the keyed state and do your business logic.


Best,
Guowei


On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <[hidden email]> wrote:
Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example 


Best,
Nick 
On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <[hidden email]> wrote:
Hi, Nick
  You might need to handle it yourself If you have to process an element only after you get the broadcast state.
  For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before.

Best,
Guowei


On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[hidden email]> wrote:
Hi guys,
What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.


Best,
Nick.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Guowei Ma
Hi,Nick
I do not think you could update the `myState`  in the `processBroadcastElement`. It is because you need a key before to update the keyedstate. But there is no key in `processBroadcastElement` .
Best,
Guowei


On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <[hidden email]> wrote:
Hi Guowei,
I am not using a keyed broadcast function, I use [1].  My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in processBroadcastElement. This way the state updates are consistent since each instance of the task gets the same broadcast element.

```
private MapState<String, MyState> myState;
 
@Override
   public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
     // Iterate over map state.
       myState.iterator().forEach(entry -> ())// Business logic
 
       // Do things
   }
 
   @Override
   public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
     // update map state which is not a broadcast state. Same update in every sub operator
       state.put(value.ID(), value.state());   // Update the mapState with value from broadcast
   }
 
 
   @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
     // called when it's time to save state
 
     myState.clear();
 
         // Update myState with current application state
 
 }
 
 @Override
 public void initializeState(FunctionInitializationContext context) throws Exception {
 
     // called when things start up, possibly recovering from an error
 
     descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
 
     myState = context.getKeyedStateStore().getMapState(descriptor);
 
     if (context.isRestored()) {
 
         // restore application state from myState  
 
     }



Best,
Nick.

On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <[hidden email]> wrote:
Hi,Nick
Normally you could not iterate all the keyed states, but the `BroadCastState` & `applyTokeyedState` could do that. 
For example, before you get the broadcast side elements you might choose to cache the non-broadcast element to the keyed state. After the broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate all the elements you "cached" in the keyed state and do your business logic.


Best,
Guowei


On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <[hidden email]> wrote:
Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example 


Best,
Nick 
On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <[hidden email]> wrote:
Hi, Nick
  You might need to handle it yourself If you have to process an element only after you get the broadcast state.
  For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before.

Best,
Guowei


On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[hidden email]> wrote:
Hi guys,
What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.


Best,
Nick.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Nick Bendtner
Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements.
Thanks a ton for your help.

Best,
Nick

On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <[hidden email]> wrote:
Hi,Nick
I do not think you could update the `myState`  in the `processBroadcastElement`. It is because you need a key before to update the keyedstate. But there is no key in `processBroadcastElement` .
Best,
Guowei


On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <[hidden email]> wrote:
Hi Guowei,
I am not using a keyed broadcast function, I use [1].  My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in processBroadcastElement. This way the state updates are consistent since each instance of the task gets the same broadcast element.

```
private MapState<String, MyState> myState;
 
@Override
   public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
     // Iterate over map state.
       myState.iterator().forEach(entry -> ())// Business logic
 
       // Do things
   }
 
   @Override
   public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
     // update map state which is not a broadcast state. Same update in every sub operator
       state.put(value.ID(), value.state());   // Update the mapState with value from broadcast
   }
 
 
   @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
     // called when it's time to save state
 
     myState.clear();
 
         // Update myState with current application state
 
 }
 
 @Override
 public void initializeState(FunctionInitializationContext context) throws Exception {
 
     // called when things start up, possibly recovering from an error
 
     descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
 
     myState = context.getKeyedStateStore().getMapState(descriptor);
 
     if (context.isRestored()) {
 
         // restore application state from myState  
 
     }



Best,
Nick.

On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <[hidden email]> wrote:
Hi,Nick
Normally you could not iterate all the keyed states, but the `BroadCastState` & `applyTokeyedState` could do that. 
For example, before you get the broadcast side elements you might choose to cache the non-broadcast element to the keyed state. After the broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate all the elements you "cached" in the keyed state and do your business logic.


Best,
Guowei


On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <[hidden email]> wrote:
Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example 


Best,
Nick 
On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <[hidden email]> wrote:
Hi, Nick
  You might need to handle it yourself If you have to process an element only after you get the broadcast state.
  For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before.

Best,
Guowei


On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[hidden email]> wrote:
Hi guys,
What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.


Best,
Nick.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Jaffe, Julian

One thing to consider could be using a CoProcessFunction instead of a BroadcastProcessFunction, and calling .broadcast on the input stream you want every task manager to receive. Then you could follow the pattern you laid out in your sample code (e.g. initialize state in the initializeState function, update myState in processElement2, and do your business logic in processElement1). You would still need some way to initialize your state with cached values, but you would have needed that anyway with the code sample you shared.

 

From: Nick Bendtner <[hidden email]>
Date: Tuesday, January 26, 2021 at 12:31 PM
To: Guowei Ma <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Initializing broadcast state

 

Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements.

Thanks a ton for your help.

 

Best,

Nick

 

On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <[hidden email]> wrote:

Hi,Nick

I do not think you could update the `myState`  in the `processBroadcastElement`. It is because you need a key before to update the keyedstate. But there is no key in `processBroadcastElement` .

Best,

Guowei

 

 

On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <[hidden email]> wrote:

Hi Guowei,

I am not using a keyed broadcast function, I use [1].  My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in processBroadcastElement. This way the state updates are consistent since each instance of the task gets the same broadcast element.

 

```

private MapState<String, MyState> myState;
 
@Override
   public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
     // Iterate over map state.
       myState.iterator().forEach(entry -> ())// Business logic
 
       // Do things
   }

 
   @Override
   public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
     // update map state which is not a broadcast state. Same update in every sub operator
       state.put(value.ID(), value.state());   // Update the mapState with value from broadcast
   }
 
 
   @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
     // called when it's time to save state
 
     myState.clear();
 
         // Update myState with current application state
 
 }
 
 @Override
 public void initializeState(FunctionInitializationContext context) throws Exception {
 
     // called when things start up, possibly recovering from an error
 
     descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
 
     myState = context.getKeyedStateStore().getMapState(descriptor);
 
     if (context.isRestored()) {
 
         // restore application state from myState  
 
     }

 

 

 

Best,

Nick.

 

On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <[hidden email]> wrote:

Hi,Nick

Normally you could not iterate all the keyed states, but the `BroadCastState` & `applyTokeyedState` could do that. 

For example, before you get the broadcast side elements you might choose to cache the non-broadcast element to the keyed state. After the broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate all the elements you "cached" in the keyed state and do your business logic.

 

 

Best,

Guowei

 

 

On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <[hidden email]> wrote:

Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example 

 

 

Best,

Nick 

On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <[hidden email]> wrote:

Hi, Nick

  You might need to handle it yourself If you have to process an element only after you get the broadcast state.

  For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before.

 

Best,

Guowei

 

 

On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[hidden email]> wrote:

Hi guys,

What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.

 

 

Best,

Nick.

Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Wei Jiang
In reply to this post by Guowei Ma
Hi guys,
i meet the same question, but i use a different way to init:
```
val list = ...   //i use jdbc to get the init data
val dimensionInitStream = env.fromCollection(list)
//the main stream and the `dimensionStream` is a stream from flink cdc
val dimension =
dimensionStream.union(dimensionInitStream).broadcast(descriptor)
...
```
then the main stream can connect the broadcast state...
emmmm... i dont know why it works, how do you think about that?


Guowei Ma wrote

> Hi, Nick
>   You might need to handle it yourself If you have to process an element
> only after you get the broadcast state.
>   For example, you could “cache” the element to the state and handle it
> when the element from the broadcast side elements are arrived. Specially
> if
> you are using the `KeyedBroadcastProcessFunction` you could use the
> `applyToKeyedState` to access the element you cache before.
>
> Best,
> Guowei
>
>
> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner &lt;

> buggie89@

> &gt; wrote:
>
>> Hi guys,
>> What is the way to initialize broadcast state(say with default values)
>> before the first element shows up in the broadcasting stream? I do a
>> lookup
>> on the broadcast state to process transactions which come from another
>> stream. The problem is the broadcast state is empty until the first
>> element
>> shows up.
>>
>>
>> Best,
>> Nick.
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Initializing broadcast state

Guowei Ma
In reply to this post by Nick Bendtner
Hi Nick
Following is an example(could not run but just to explain the idea). I use the `KeyedBroadcastProcessFunction` because I saw your code use the keyedstate.
private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast
extends KeyedBroadcastProcessFunction<String, String, Integer, String> {

private static final long serialVersionUID = 7496674620398203933L;

private final ListStateDescriptor<String> listStateDesc =
new ListStateDescriptor<>("cache element", BasicTypeInfo.STRING_TYPE_INFO);



@Override
public void processBroadcastElement(Integer value, Context ctx, Collector<String> out)
throws Exception {
ctx.applyToKeyedState(
listStateDesc,
new KeyedStateFunction<String, ListState<String>>() {
@Override
public void process(String key, ListState<String> state) throws Exception {
// do the logical with cache state and broadcat value;
// clear the state
state.clear();
}
});
}

@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out)
throws Exception {
YourBroadCastState = ctx.getBroadcastState("your broad cast state");
if (YourBroadCastState is empty) {
// cache the element
getRuntimeContext().getListState(listStateDesc).add(value);
} else {
// do your business logic with YourBroadCastState and your value.
}
}
}
Best,
Guowei


On Wed, Jan 27, 2021 at 4:31 AM Nick Bendtner <[hidden email]> wrote:
Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements.
Thanks a ton for your help.

Best,
Nick

On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <[hidden email]> wrote:
Hi,Nick
I do not think you could update the `myState`  in the `processBroadcastElement`. It is because you need a key before to update the keyedstate. But there is no key in `processBroadcastElement` .
Best,
Guowei


On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <[hidden email]> wrote:
Hi Guowei,
I am not using a keyed broadcast function, I use [1].  My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in processBroadcastElement. This way the state updates are consistent since each instance of the task gets the same broadcast element.

```
private MapState<String, MyState> myState;
 
@Override
   public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
     // Iterate over map state.
       myState.iterator().forEach(entry -> ())// Business logic
 
       // Do things
   }
 
   @Override
   public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
     // update map state which is not a broadcast state. Same update in every sub operator
       state.put(value.ID(), value.state());   // Update the mapState with value from broadcast
   }
 
 
   @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
     // called when it's time to save state
 
     myState.clear();
 
         // Update myState with current application state
 
 }
 
 @Override
 public void initializeState(FunctionInitializationContext context) throws Exception {
 
     // called when things start up, possibly recovering from an error
 
     descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
 
     myState = context.getKeyedStateStore().getMapState(descriptor);
 
     if (context.isRestored()) {
 
         // restore application state from myState  
 
     }



Best,
Nick.

On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <[hidden email]> wrote:
Hi,Nick
Normally you could not iterate all the keyed states, but the `BroadCastState` & `applyTokeyedState` could do that. 
For example, before you get the broadcast side elements you might choose to cache the non-broadcast element to the keyed state. After the broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate all the elements you "cached" in the keyed state and do your business logic.


Best,
Guowei


On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <[hidden email]> wrote:
Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example 


Best,
Nick 
On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <[hidden email]> wrote:
Hi, Nick
  You might need to handle it yourself If you have to process an element only after you get the broadcast state.
  For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before.

Best,
Guowei


On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[hidden email]> wrote:
Hi guys,
What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up.


Best,
Nick.