Rich Function Thread Safety

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Rich Function Thread Safety

Aaron Langford
Hello Flink Community,

I'm hoping to verify some understanding:

If I have a function with managed state, I'm wondering if a checkpoint will ever be taken while a function is mutating state. I'll try to illustrate the situation I'm hoping to be safe from:

Happy Path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> set B to 10
t3 -> function returns

Unhappy path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
t3 -> set B to 10
t4 -> function returns
...
tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
tn+1 -> recovery begins somewhere, but state is torn anyway, so we're going to have a bad time

I don't think this could happen given that checkpoints effectively are messages in the pipeline, and the checkpoint is only taken when an operator sees the checkpoint barrier.

Hoping to make sure this is correct!

Aaron
Reply | Threaded
Open this post in threaded view
|

Re: Rich Function Thread Safety

Zhu Zhu
Hi Aaron,

It is thread safe since the state snapshot happens in the same thread with the user function.

Thanks,
Zhu Zhu

Aaron Langford <[hidden email]> 于2019年12月19日周四 上午11:25写道:
Hello Flink Community,

I'm hoping to verify some understanding:

If I have a function with managed state, I'm wondering if a checkpoint will ever be taken while a function is mutating state. I'll try to illustrate the situation I'm hoping to be safe from:

Happy Path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> set B to 10
t3 -> function returns

Unhappy path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
t3 -> set B to 10
t4 -> function returns
...
tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
tn+1 -> recovery begins somewhere, but state is torn anyway, so we're going to have a bad time

I don't think this could happen given that checkpoints effectively are messages in the pipeline, and the checkpoint is only taken when an operator sees the checkpoint barrier.

Hoping to make sure this is correct!

Aaron
Reply | Threaded
Open this post in threaded view
|

Re: Rich Function Thread Safety

Joey Echeverria
I’ve seen a few mailing list posts (including this one) that say Flink guarantees there is no concurrent access to operator methods (e.g. flatMap, snapshotState, etc.) and thus synchronization isn’t needed when writing operators that support checkpointing. I was trying to find a place in the official docs where this was called out, but was coming up empty.

Is there a section of the docs that covers this topic?

Thanks!

-Joey

On Dec 18, 2019, at 9:38 PM, Zhu Zhu <[hidden email]> wrote:

[--- This email originated from outside of the organization. Do not click links or open attachments unless you recognize the sender and know the content is safe. ---]

Hi Aaron,

It is thread safe since the state snapshot happens in the same thread with the user function.

Thanks,
Zhu Zhu

Aaron Langford <[hidden email]> 于2019年12月19日周四 上午11:25写道:
Hello Flink Community,

I'm hoping to verify some understanding:

If I have a function with managed state, I'm wondering if a checkpoint will ever be taken while a function is mutating state. I'll try to illustrate the situation I'm hoping to be safe from:

Happy Path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> set B to 10
t3 -> function returns

Unhappy path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
t3 -> set B to 10
t4 -> function returns
...
tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
tn+1 -> recovery begins somewhere, but state is torn anyway, so we're going to have a bad time

I don't think this could happen given that checkpoints effectively are messages in the pipeline, and the checkpoint is only taken when an operator sees the checkpoint barrier.

Hoping to make sure this is correct!

Aaron

Reply | Threaded
Open this post in threaded view
|

Re: Rich Function Thread Safety

tao xiao
As the java doc suggests it seems operator method and snapshot checkpoint are accessed by two different threads


On Thu, May 7, 2020 at 1:22 AM Joey Echeverria <[hidden email]> wrote:
I’ve seen a few mailing list posts (including this one) that say Flink guarantees there is no concurrent access to operator methods (e.g. flatMap, snapshotState, etc.) and thus synchronization isn’t needed when writing operators that support checkpointing. I was trying to find a place in the official docs where this was called out, but was coming up empty.

Is there a section of the docs that covers this topic?

Thanks!

-Joey

On Dec 18, 2019, at 9:38 PM, Zhu Zhu <[hidden email]> wrote:

[--- This email originated from outside of the organization. Do not click links or open attachments unless you recognize the sender and know the content is safe. ---]

Hi Aaron,

It is thread safe since the state snapshot happens in the same thread with the user function.

Thanks,
Zhu Zhu

Aaron Langford <[hidden email]> 于2019年12月19日周四 上午11:25写道:
Hello Flink Community,

I'm hoping to verify some understanding:

If I have a function with managed state, I'm wondering if a checkpoint will ever be taken while a function is mutating state. I'll try to illustrate the situation I'm hoping to be safe from:

Happy Path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> set B to 10
t3 -> function returns

Unhappy path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
t3 -> set B to 10
t4 -> function returns
...
tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
tn+1 -> recovery begins somewhere, but state is torn anyway, so we're going to have a bad time

I don't think this could happen given that checkpoints effectively are messages in the pipeline, and the checkpoint is only taken when an operator sees the checkpoint barrier.

Hoping to make sure this is correct!

Aaron



--
Regards,
Tao
Reply | Threaded
Open this post in threaded view
|

Re: Rich Function Thread Safety

Tzu-Li (Gordon) Tai
In reply to this post by Joey Echeverria
As others have mentioned already, it is true that method calls on operators
(e.g. processing events and snapshotting state) will not concurrently
happen.

As for your findings in reading through the documentation, that might be a
hint that we could add a bit more explanation mentioning this.
Could you suggest where you'd probably expect to see this being mentioned,
based on your readt-hrough?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Rich Function Thread Safety

Lian Jiang
Hi,

and wondering if the invoke function is thread safe for:

final int seen = count.getOrDefault(0);
count.set(seen + 1);

From https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/logical.html
"When an application starts, each parallel worker of the framework will create one physical object per function type."

It sounds like one function can be invoked by multiple workers at the same time. The tutorial example
indicates that the persistedValue can be process safe (cross multiple workers) and thread safe (inside
a worker, e.g. timer callback).


Could you please add some clarification on the questions below?
1. What's the design (briefly) for persisted state process/thread safety? 
2. Is there any scenario that the developers need to worry about process/thread safety when using state?
3. can I consider stateful functions as Flink operators so that all operator related theories can be applied to stateful functions?
4. Similarly, can we apply all theories of DataStream state to stateFun's state?
Appreciate very much!

Thanks
Lian

On Sun, May 10, 2020 at 9:33 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
As others have mentioned already, it is true that method calls on operators
(e.g. processing events and snapshotting state) will not concurrently
happen.

As for your findings in reading through the documentation, that might be a
hint that we could add a bit more explanation mentioning this.
Could you suggest where you'd probably expect to see this being mentioned,
based on your readt-hrough?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
Reply | Threaded
Open this post in threaded view
|

Re: Rich Function Thread Safety

Igal Shilman
Hi Lian,

Good to hear that you are learning about StateFun, and I'd be happy to answer any of your questions while doing so :-)
Perhaps in the future it would be best if you start a new email thread, so that it would be easier to spot your question.

The following is completely thread safe:
final int seen = count.getOrDefault(0);
count.set(seen + 1);
The simple reason is that the functions are invoked one by one on a single OS thread, and different OS threads
do not share function instances between them. In addition each OS thread would own a chunk of keys that only it can invoke.

2. Is there any scenario that the developers need to worry about process/thread safety when using state?
Few things here:
* do not share mutable static variables without synchronization.
* try to minimize/avoid doing long blocking calls. Use asynchronous API if applicable.

3. can I consider stateful functions as Flink operators so that all operator related theories can be applied to stateful functions?

Absolutely yes. StateFun is built on-top of the DataStream API + some internal bits.
 
4. Similarly, can we apply all theories of DataStream state to stateFun's state?
I'm not sure what do you mean by that, but at large yes. The main difference would be that
We don't support state evolution with arbitrary state types, but strictly require Protocol Buffers for that.

Good luck,
Igal.

On Sun, Oct 25, 2020 at 7:43 PM Lian Jiang <[hidden email]> wrote:
Hi,

and wondering if the invoke function is thread safe for:

final int seen = count.getOrDefault(0);
count.set(seen + 1);

From https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/logical.html
"When an application starts, each parallel worker of the framework will create one physical object per function type."

It sounds like one function can be invoked by multiple workers at the same time. The tutorial example
indicates that the persistedValue can be process safe (cross multiple workers) and thread safe (inside
a worker, e.g. timer callback).


Could you please add some clarification on the questions below?
1. What's the design (briefly) for persisted state process/thread safety? 
2. Is there any scenario that the developers need to worry about process/thread safety when using state?
3. can I consider stateful functions as Flink operators so that all operator related theories can be applied to stateful functions?
4. Similarly, can we apply all theories of DataStream state to stateFun's state?
Appreciate very much!

Thanks
Lian

On Sun, May 10, 2020 at 9:33 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
As others have mentioned already, it is true that method calls on operators
(e.g. processing events and snapshotting state) will not concurrently
happen.

As for your findings in reading through the documentation, that might be a
hint that we could add a bit more explanation mentioning this.
Could you suggest where you'd probably expect to see this being mentioned,
based on your readt-hrough?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--