Thread access and broadcast state initialization in BroadcastProcessFunction

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Thread access and broadcast state initialization in BroadcastProcessFunction

KristoffSC
Hi,
I was playing around with BroadcastProcessFunction and I've observe a
specific behavior.

My setup:

MapStateDescriptor<Void, ProcessingRule> ruleStateDescriptor = new
MapStateDescriptor<>(
                "RulesBroadcastState",
                Types.VOID,
                TypeInformation.of(new TypeHint<ProcessingRule>() {
                }));

BroadcastStream<ProcessingRule> processingRulesBroadcastStream =
processingRulesStream
               .broadcast(ruleStateDescriptor);


SingleOutputStreamOperator<EvaluatedTransaction> evaluatedTrades =
enrichedTransactionStream
                .connect(processingRulesBroadcastStream)
                .process(new DriveEngineRuleOperator())
                .name("Drive Rule Evaluation");

Where DriveEngineRuleOperator extends BroadcastProcessFunction and
implements open, processElement and processBroadcastElement methods.

I was following Flink's tutorials about broadcast state pattern and my
"open" method looks like this:

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        processingRulesDesc = new MapStateDescriptor<>(
                "RulesBroadcastState",
                Types.VOID,
                TypeInformation.of(new TypeHint<RuleParams>() {
                }));

 
    }


I've noticed that all methods are called by the same thread. Would it be
always the case, or could those methods be called by different threads?

The second thing I've noticed is that "open" method was executed only before
the first "fast stream" element was received (before execution of
processElement method). That means that if I received the control stream
element first (the broadcast stream element) then method open would not be
called and I will not initialize my processing rule descriptor - I will
loose the event.

What are the good practices in this case?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

vino yang
Hi kristoffSC,

>> I've noticed that all methods are called by the same thread. Would it be
always the case, or could those methods be called by different threads?

No, open/processXXX/close methods are called in the different stages of a task thread's life cycle. The framework must keep the call order.

>> The second thing I've noticed is that "open" method was executed only before
the first "fast stream" element was received (before execution of
processElement method). That means that if I received the control stream
element first (the broadcast stream element) then method open would not be
called and I will not initialize my processing rule descriptor - I will
loose the event.

There is a similar question I joined that you can consider.[1]
There is also another similar question that comes from StackOverflow.[2]

Best,
Vino


KristoffSC <[hidden email]> 于2019年12月11日周三 上午5:56写道:
Hi,
I was playing around with BroadcastProcessFunction and I've observe a
specific behavior.

My setup:

MapStateDescriptor<Void, ProcessingRule> ruleStateDescriptor = new
MapStateDescriptor<>(
                "RulesBroadcastState",
                Types.VOID,
                TypeInformation.of(new TypeHint<ProcessingRule>() {
                }));

BroadcastStream<ProcessingRule> processingRulesBroadcastStream =
processingRulesStream
               .broadcast(ruleStateDescriptor);


SingleOutputStreamOperator<EvaluatedTransaction> evaluatedTrades =
enrichedTransactionStream
                .connect(processingRulesBroadcastStream)
                .process(new DriveEngineRuleOperator())
                .name("Drive Rule Evaluation");

Where DriveEngineRuleOperator extends BroadcastProcessFunction and
implements open, processElement and processBroadcastElement methods.

I was following Flink's tutorials about broadcast state pattern and my
"open" method looks like this:

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        processingRulesDesc = new MapStateDescriptor<>(
                "RulesBroadcastState",
                Types.VOID,
                TypeInformation.of(new TypeHint<RuleParams>() {
                }));


    }


I've noticed that all methods are called by the same thread. Would it be
always the case, or could those methods be called by different threads?

The second thing I've noticed is that "open" method was executed only before
the first "fast stream" element was received (before execution of
processElement method). That means that if I received the control stream
element first (the broadcast stream element) then method open would not be
called and I will not initialize my processing rule descriptor - I will
loose the event.

What are the good practices in this case?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

KristoffSC
Hi Vino,
Thank you for your response and provided links.

So just to clarify and small follow up.

1. Methods will be called only by one thread right?

2. The links you provided are tackling a case when we got a "fast stream"
element before we received broadcast stream element. In my case we had
Broadcast element first, before we got any "fast stream" element. Because
open method was not called (I've observed it will be called only before
first processElement method call, so before processing the first "fast
stream" element) we don't have the state descriptor which would be
initialized in open method. So we actually cannot "store/process" this
broadcast element in our broadcast state.


 @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        processingRulesDesc = new MapStateDescriptor<>(
                "RulesBroadcastState",
                Types.VOID,
                TypeInformation.of(new TypeHint<ProcessingRule>() {
                }));


    }

In this case, bcState  will be null since open method was not yet called.
 public void processBroadcastElement(ProcessingRule rule, Context ctx,
Collector<EvaluatedTransaction> out) throws Exception {
        // store the new pattern by updating the broadcast state
        BroadcastState<Void, ProcessingRule> bcState =
ctx.getBroadcastState(processingRulesDesc);
        bcState.put(null, rule);
    }






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

Timo Walther
1. Yes, methods will only be called by one thread. The FLink API aims to
abstract all concurrency topics away when using the provided methods and
state.

2. The open() method should always be the first method being called. If
this is not the case, this is definitely a bug. Which Flink version are
you using? If it is 1.9, could you verify the behavior with 1.8? The
community recently simplified the architeture under the hood.

Thanks for your feedback.

Regards,
Timo

On 11.12.19 10:41, KristoffSC wrote:

> Hi Vino,
> Thank you for your response and provided links.
>
> So just to clarify and small follow up.
>
> 1. Methods will be called only by one thread right?
>
> 2. The links you provided are tackling a case when we got a "fast stream"
> element before we received broadcast stream element. In my case we had
> Broadcast element first, before we got any "fast stream" element. Because
> open method was not called (I've observed it will be called only before
> first processElement method call, so before processing the first "fast
> stream" element) we don't have the state descriptor which would be
> initialized in open method. So we actually cannot "store/process" this
> broadcast element in our broadcast state.
>
>
>   @Override
>      public void open(Configuration parameters) throws Exception {
>          super.open(parameters);
>          processingRulesDesc = new MapStateDescriptor<>(
>                  "RulesBroadcastState",
>                  Types.VOID,
>                  TypeInformation.of(new TypeHint<ProcessingRule>() {
>                  }));
>
>
>      }
>
> In this case, bcState  will be null since open method was not yet called.
>   public void processBroadcastElement(ProcessingRule rule, Context ctx,
> Collector<EvaluatedTransaction> out) throws Exception {
>          // store the new pattern by updating the broadcast state
>          BroadcastState<Void, ProcessingRule> bcState =
> ctx.getBroadcastState(processingRulesDesc);
>          bcState.put(null, rule);
>      }
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

KristoffSC
Thank you for your reply Timo.

Regarding point 2. I'm sorry for the delay. I rerun my test and everything
seems to be in order. Open method was called as first. I guess it was a
false alarm. Sorry for that.

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/