What's the advantage of using BroadcastState?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

What's the advantage of using BroadcastState?

Paul Lam
Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam
Reply | Threaded
Open this post in threaded view
|

Re: What's the advantage of using BroadcastState?

Hequn Cheng
Hi Paul,

There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 
2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities.

Best, Hequn

On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <[hidden email]> wrote:
Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: What's the advantage of using BroadcastState?

Rong Rong
Hi Paul,

To add to Hequn's answer. Broadcast state can typically be used as "a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream" [1] 
So to add to the difference list is: whether it is "broadcast" across all keys if processing a keyed stream. This is typically when it is not possible to derive same key field using KeySelector in CoStream.
Another additional difference is performance: BroadcastStream is "stored locally and is used to process all incoming elements on the other stream" thus requires to carefully manage the size of the BroadcastStream.


On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <[hidden email]> wrote:
Hi Paul,

There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 
2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities.

Best, Hequn

On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <[hidden email]> wrote:
Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: What's the advantage of using BroadcastState?

Paul Lam
Hi Rong, Hequn

Your answers are very helpful! Thank you!

Best Regards,
Paul Lam

在 2018年8月19日,23:30,Rong Rong <[hidden email]> 写道:

Hi Paul,

To add to Hequn's answer. Broadcast state can typically be used as "a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream" [1] 
So to add to the difference list is: whether it is "broadcast" across all keys if processing a keyed stream. This is typically when it is not possible to derive same key field using KeySelector in CoStream.
Another additional difference is performance: BroadcastStream is "stored locally and is used to process all incoming elements on the other stream" thus requires to carefully manage the size of the BroadcastStream.


On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <[hidden email]> wrote:
Hi Paul,

There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 
2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities.

Best, Hequn

On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <[hidden email]> wrote:
Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam


Reply | Threaded
Open this post in threaded view
|

Re: What's the advantage of using BroadcastState?

Fabian Hueske-2
Hi,

I've recently published a blog post about Broadcast State [1].

Cheers,
Fabian


2018-08-20 3:58 GMT+02:00 Paul Lam <[hidden email]>:
Hi Rong, Hequn

Your answers are very helpful! Thank you!

Best Regards,
Paul Lam

在 2018年8月19日,23:30,Rong Rong <[hidden email]> 写道:

Hi Paul,

To add to Hequn's answer. Broadcast state can typically be used as "a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream" [1] 
So to add to the difference list is: whether it is "broadcast" across all keys if processing a keyed stream. This is typically when it is not possible to derive same key field using KeySelector in CoStream.
Another additional difference is performance: BroadcastStream is "stored locally and is used to process all incoming elements on the other stream" thus requires to carefully manage the size of the BroadcastStream.


On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <[hidden email]> wrote:
Hi Paul,

There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 
2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities.

Best, Hequn

On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <[hidden email]> wrote:
Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam



Reply | Threaded
Open this post in threaded view
|

RE: What's the advantage of using BroadcastState?

Radu Tudoran

Hi Fabian,

 

Thanks for the blog post about broadcast state. I have a question with respect to the update capabilities of the broadcast state:

 

Assume you do whatever processing logic in the main processElement function .. and at a given context marker you 1) would change a local field marker, to 2) signal that next time the broadcast function is triggered a special pattern should be created and broadcasted.

 

My question is: is such a behavior allowed? Would the new special Pattern that originates in an operator be shared across the other instances of the KeyedProcessFunction?

 

 

public static class PatternEvaluator
 extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {

public bolean test = false;

 
  @Override
  public void processElement(
     Action action, 
 
    ReadOnlyContext ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
 

   //…logic

  

   if (..whatever context) {

      Test = true;

   }

 

   }

 @Override
 public void processBroadcastElement(
     Pattern pattern, 
     Context ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
   // store the new pattern by updating the broadcast state
  

 BroadcastState<Void, Pattern> bcState =
     ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
   // storing in MapState with null as VOID default value
   bcState.put(null, pattern);

   If (test) {

       bcState.put(null, new Pattern(test) );
   }

 

 }
}

 

 

Dr. Radu Tudoran

Staff Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

German Research Center

Munich Office

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Monday, August 20, 2018 9:40 AM
To: Paul Lam <[hidden email]>
Cc: Rong Rong <[hidden email]>; Hequn Cheng <[hidden email]>; user <[hidden email]>
Subject: Re: What's the advantage of using BroadcastState?

 

Hi,

 

I've recently published a blog post about Broadcast State [1].

 

Cheers,

Fabian

 

 

2018-08-20 3:58 GMT+02:00 Paul Lam <[hidden email]>:

Hi Rong, Hequn

 

Your answers are very helpful! Thank you!

 

Best Regards,

Paul Lam



2018819日,23:30Rong Rong <[hidden email]> 写道:

 

Hi Paul,

 

To add to Hequn's answer. Broadcast state can typically be used as "a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream" [1] 

So to add to the difference list is: whether it is "broadcast" across all keys if processing a keyed stream. This is typically when it is not possible to derive same key field using KeySelector in CoStream.

Another additional difference is performance: BroadcastStream is "stored locally and is used to process all incoming elements on the other stream" thus requires to carefully manage the size of the BroadcastStream.

 

 

On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <[hidden email]> wrote:

Hi Paul,

 

There are some differences:

1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 

2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side

3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities.

 

Best, Hequn

 

On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <[hidden email]> wrote:

Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: What's the advantage of using BroadcastState?

Xingcan Cui
Hi Radu,

I cannot make a full understanding of your question but I guess the answer is NO.

The broadcast state pattern just provides you with an automatic data broadcasting and a bunch of map states to cache the "low-throughput” patterns. Also, to keep consistency, it forbid the `processElement()` to modify the states. But this API does not really broadcast the states. You should keep the logic for `processBraodcastElement()` deterministic. Maybe the equation below could make the pattern clear.

<identical input> + <deterministic logic> = <identical states> = <broadcast state>

Best,
Xingcan

On Aug 27, 2018, at 10:23 PM, Radu Tudoran <[hidden email]> wrote:

Hi Fabian,
 
Thanks for the blog post about broadcast state. I have a question with respect to the update capabilities of the broadcast state:
 
Assume you do whatever processing logic in the main processElement function .. and at a given context marker you 1) would change a local field marker, to 2) signal that next time the broadcast function is triggered a special pattern should be created and broadcasted.
 
My question is: is such a behavior allowed? Would the new special Pattern that originates in an operator be shared across the other instances of the KeyedProcessFunction?
 
 
public static class PatternEvaluator
 extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {

public bolean test = false;
 
  @Override
  public void processElement(
     Action action, 
 
    ReadOnlyContext ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
 
   //…logic
  
   if (..whatever context) {
      Test = true;
   }
 
   }

 @Override
 public void processBroadcastElement(
     Pattern pattern, 
     Context ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
   // store the new pattern by updating the broadcast state
  
 BroadcastState<Void, Pattern> bcState =
     ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
   // storing in MapState with null as VOID default value
   bcState.put(null, pattern);

   If (test) {
       bcState.put(null, new Pattern(test) );
   }
 
 }
}
 
 
Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München
 
Mobile: +49 15209084330
Telephone: +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
 
From: Fabian Hueske [[hidden email]] 
Sent: Monday, August 20, 2018 9:40 AM
To: Paul Lam <[hidden email]>
Cc: Rong Rong <[hidden email]>; Hequn Cheng <[hidden email]>; user <[hidden email]>
Subject: Re: What's the advantage of using BroadcastState?
 
Hi,
 
I've recently published a blog post about Broadcast State [1].
 
Cheers,
Fabian
 
 
2018-08-20 3:58 GMT+02:00 Paul Lam <[hidden email]>:
Hi Rong, Hequn
 
Your answers are very helpful! Thank you!
 
Best Regards,
Paul Lam


 2018819日,23:30Rong Rong <[hidden email]> 写道:
 
Hi Paul,
 
To add to Hequn's answer. Broadcast state can typically be used as "a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream" [1] 
So to add to the difference list is: whether it is "broadcast" across all keys if processing a keyed stream. This is typically when it is not possible to derive same key field using KeySelector in CoStream.
Another additional difference is performance: BroadcastStream is "stored locally and is used to process all incoming elements on the other stream" thus requires to carefully manage the size of the BroadcastStream.
 
 
On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <[hidden email]> wrote:
Hi Paul,
 
There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 
2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities.
 
Best, Hequn
 
On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <[hidden email]> wrote:
Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: What's the advantage of using BroadcastState?

Fabian Hueske-2
Hi,

Xingcan is right. There is no hidden state synchronization happening.
You have to ensure that the broadcast state is the same at every parallel instance. Hence, it should only be modified by the processBroadcastElement() method that receives the same broadcasted elements on all task instance.
The API tries to help users to not violate the contract, however it is not bullet proof. Side-passing information in a local variable (as suggested by you) cannot be prevented and would lead to inconsistencies.

Best, Fabian



Am Mo., 27. Aug. 2018 um 16:51 Uhr schrieb Xingcan Cui <[hidden email]>:
Hi Radu,

I cannot make a full understanding of your question but I guess the answer is NO.

The broadcast state pattern just provides you with an automatic data broadcasting and a bunch of map states to cache the "low-throughput” patterns. Also, to keep consistency, it forbid the `processElement()` to modify the states. But this API does not really broadcast the states. You should keep the logic for `processBraodcastElement()` deterministic. Maybe the equation below could make the pattern clear.

<identical input> + <deterministic logic> = <identical states> = <broadcast state>

Best,
Xingcan

On Aug 27, 2018, at 10:23 PM, Radu Tudoran <[hidden email]> wrote:

Hi Fabian,
 
Thanks for the blog post about broadcast state. I have a question with respect to the update capabilities of the broadcast state:
 
Assume you do whatever processing logic in the main processElement function .. and at a given context marker you 1) would change a local field marker, to 2) signal that next time the broadcast function is triggered a special pattern should be created and broadcasted.
 
My question is: is such a behavior allowed? Would the new special Pattern that originates in an operator be shared across the other instances of the KeyedProcessFunction?
 
 
public static class PatternEvaluator
 extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {

public bolean test = false;
 
  @Override
  public void processElement(
     Action action, 
 
    ReadOnlyContext ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
 
   //…logic
  
   if (..whatever context) {
      Test = true;
   }
 
   }

 @Override
 public void processBroadcastElement(
     Pattern pattern, 
     Context ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
   // store the new pattern by updating the broadcast state
  
 BroadcastState<Void, Pattern> bcState =
     ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
   // storing in MapState with null as VOID default value
   bcState.put(null, pattern);

   If (test) {
       bcState.put(null, new Pattern(test) );
   }
 
 }
}
 
 
Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München
 
Mobile: +49 15209084330
Telephone: +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
 
From: Fabian Hueske [[hidden email]] 
Sent: Monday, August 20, 2018 9:40 AM
To: Paul Lam <[hidden email]>
Cc: Rong Rong <[hidden email]>; Hequn Cheng <[hidden email]>; user <[hidden email]>
Subject: Re: What's the advantage of using BroadcastState?
 
Hi,
 
I've recently published a blog post about Broadcast State [1].
 
Cheers,
Fabian
 
 
2018-08-20 3:58 GMT+02:00 Paul Lam <[hidden email]>:
Hi Rong, Hequn
 
Your answers are very helpful! Thank you!
 
Best Regards,
Paul Lam


 2018819日,23:30Rong Rong <[hidden email]> 写道:
 
Hi Paul,
 
To add to Hequn's answer. Broadcast state can typically be used as "a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream" [1] 
So to add to the difference list is: whether it is "broadcast" across all keys if processing a keyed stream. This is typically when it is not possible to derive same key field using KeySelector in CoStream.
Another additional difference is performance: BroadcastStream is "stored locally and is used to process all incoming elements on the other stream" thus requires to carefully manage the size of the BroadcastStream.
 
 
On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <[hidden email]> wrote:
Hi Paul,
 
There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 
2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities.
 
Best, Hequn
 
On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <[hidden email]> wrote:
Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam

Reply | Threaded
Open this post in threaded view
|

FW: What's the advantage of using BroadcastState?

Radu Tudoran

Thanks. For the explanation – I was suspected it might be like this and I wanted to double check before building inconsistent programs J)

 

Would it be interesting for the community to have also something that would also be able to share/broadcast items from one task to the other tasks. Spark for example has this as the broadcast behavior, and it can be useful for different algorithms and applications. For example if one build a clustering algorithm running in parallel, then you can have a task updating the cluster centroids, that would advertise to the others afterwards (just one of the examples).

I am thinking we can achieve this using a combination of the broadcast stream and the iterative operators. Basically if a task makes an update to the state, this value should be push back (upstream) such that afterwards it can be broadcasted to all the task using it.

 

Let me know what you think..

 

Dr. Radu Tudoran

Staff Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

German Research Center

Munich Office

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

From: Fabian Hueske [[hidden email]]
Sent: Tuesday, August 28, 2018 9:53 AM
To: Xingcan Cui <[hidden email]>
Cc: Radu Tudoran <[hidden email]>; user <[hidden email]>
Subject: Re: What's the advantage of using BroadcastState?

 

Hi,

 

Xingcan is right. There is no hidden state synchronization happening.

You have to ensure that the broadcast state is the same at every parallel instance. Hence, it should only be modified by the processBroadcastElement() method that receives the same broadcasted elements on all task instance.

The API tries to help users to not violate the contract, however it is not bullet proof. Side-passing information in a local variable (as suggested by you) cannot be prevented and would lead to inconsistencies.

 

Best, Fabian

 

 

 

Am Mo., 27. Aug. 2018 um 16:51 Uhr schrieb Xingcan Cui <[hidden email]>:

Hi Radu,

 

I cannot make a full understanding of your question but I guess the answer is NO.

 

The broadcast state pattern just provides you with an automatic data broadcasting and a bunch of map states to cache the "low-throughput patterns. Also, to keep consistency, it forbid the `processElement()` to modify the states. But this API does not really broadcast the states. You should keep the logic for `processBraodcastElement()` deterministic. Maybe the equation below could make the pattern clear.

 

<identical input> + <deterministic logic> = <identical states> = <broadcast state>

 

Best,

Xingcan

 

On Aug 27, 2018, at 10:23 PM, Radu Tudoran <[hidden email]> wrote:

 

Hi Fabian,

 

Thanks for the blog post about broadcast state. I have a question with respect to the update capabilities of the broadcast state:

 

Assume you do whatever processing logic in the main processElement function .. and at a given context marker you 1) would change a local field marker, to 2) signal that next time the broadcast function is triggered a special pattern should be created and broadcasted.

 

My question is: is such a behavior allowed? Would the new special Pattern that originates in an operator be shared across the other instances of the KeyedProcessFunction?

 

 

public static class PatternEvaluator
 extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {

public bolean test = false;

 
  @Override
  public void processElement(
     Action action, 
 
    ReadOnlyContext ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
 

   //…logic

  

   if (..whatever context) {

      Test = true;

   }

 

   }

 @Override
 public void processBroadcastElement(
     Pattern pattern, 
     Context ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
   // store the new pattern by updating the broadcast state
  

 BroadcastState<Void, Pattern> bcState =
     ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
   // storing in MapState with null as VOID default value
   bcState.put(null, pattern);

   If (test) {

       bcState.put(null, new Pattern(test) );
   }

 

 }
}

 

 

Dr. Radu Tudoran

Staff Research Engineer - Big Data Expert

IT R&D Division

 

<image001.png>

HUAWEI TECHNOLOGIES Duesseldorf GmbH

German Research Center

Munich Office

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

From: Fabian Hueske [[hidden email]] 
Sent: Monday, August 20, 2018 9:40 AM
To: Paul Lam <[hidden email]>
Cc: Rong Rong <[hidden email]>; Hequn Cheng <[hidden email]>; user <[hidden email]>
Subject: Re: What's the advantage of using BroadcastState?

 

Hi,

 

I've recently published a blog post about Broadcast State [1].

 

Cheers,

Fabian

 

 

2018-08-20 3:58 GMT+02:00 Paul Lam <[hidden email]>:

Hi Rong, Hequn

 

Your answers are very helpful! Thank you!

 

Best Regards,

Paul Lam

 

 2018819日,23:30Rong Rong <[hidden email]> 写道:

 

Hi Paul,

 

To add to Hequn's answer. Broadcast state can typically be used as "a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream" [1] 

So to add to the difference list is: whether it is "broadcast" across all keys if processing a keyed stream. This is typically when it is not possible to derive same key field using KeySelector in CoStream.

Another additional difference is performance: BroadcastStream is "stored locally and is used to process all incoming elements on the other stream" thus requires to carefully manage the size of the BroadcastStream.

 

 

On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <[hidden email]> wrote:

Hi Paul,

 

There are some differences:

1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 

2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side

3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities.

 

Best, Hequn

 

On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <[hidden email]> wrote:

Hi, 

AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam