Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

Chao Wang
Hi,

I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe, and to what extent? What's the difference between the two Functions? and in general, how does Flink prevent race conditions? Here's my case:

I tried to condition on two input streams and produce the third stream if the condition is met. I implemented CoFlatMapFunction and tried to monitor a state using a field in the implemented class (I want to isolate my application from the checkpointing feature, and therefore I do not use the states as documented here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html). The field served as a flag indicating whether there are some pending data from either input stream, and if yes, processing it along with the arriving data from the other input stream (the processing invokes a native function).

But then I got double free error and segmentation fault, which I believe was due to unintentional concurrent access to the native function. Then I tried to wrap the access into a synchronized method, as well as explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain and the error remained.

I considered using CoProcessFunction in my case, but seems to me that it does not handle customary internal states, stating in the javadoc "The context [CoProcessFunction.Context] is only valid during the invocation of this method, do not store it."



Thanks,
Chao
Reply | Threaded
Open this post in threaded view
|

Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

Nico Kruber
Hi Chao,

1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
quote the javadoc of the CoProcessFunction:

"Contrary to the {@link CoFlatMapFunction}, this function can also query the
time (both event and processing) and set timers, through the provided {@link
Context}. When reacting to the firing of set timers the function can emit yet
more elements."

So, imho, both deliver a different level of abstraction and control (high- vs.
low-level). Also note the different methods available for you to implement.

2a) In general, Flink calls functions on a per-record base in a serialized
fashion per task. For each task at a TaskManager, in case of it having
multiple slots, separate function objects are used where you should only get
in trouble if you share static references. Otherwise you do not need to worry
about thread-safety.

2b) From what I see in the code (StreamTwoInputProcessor), the same should
apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2
and processElement1/2 are not called in parallel!

3) why would you want to store the CoProcessFunction.Context?


Nico

On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:

> Hi,
>
> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
> and to what extent? What's the difference between the two Functions? and
> in general, how does Flink prevent race conditions? Here's my case:
>
> I tried to condition on two input streams and produce the third stream
> if the condition is met. I implemented CoFlatMapFunction and tried to
> monitor a state using a field in the implemented class (I want to
> isolate my application from the checkpointing feature, and therefore I
> do not use the states as documented here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
> .html). The field served as a flag indicating whether there are some pending
> data from either input stream, and if yes, processing it along with the
> arriving data from the other input stream (the processing invokes a native
> function).
>
> But then I got double free error and segmentation fault, which I believe
> was due to unintentional concurrent access to the native function. Then
> I tried to wrap the access into a synchronized method, as well as
> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
> and the error remained.
>
> I considered using CoProcessFunction in my case, but seems to me that it
> does not handle customary internal states, stating in the javadoc "The
> context [CoProcessFunction.Context] is only valid during the invocation
> of this method, do not store it."
>
>
>
> Thanks,
> Chao


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

Chao Wang
Thank you! Nico. That helps me a lot!

2a) That really clarifies my understanding about Flink. Yes, I think I
have used static references, since I invoked a native function
(implemented through JNI) which I believe only has one instance per
process. And I guess the reason why those Java synchronization
mechanisms were in vain is because of separate function objects at
runtime, which results in separate lock objects. Now I use c++ mutex
within the native function and it resolves my case.

BTW, could you elaborate a bit more about what do you mean by
"per-record base"? what do you mean by a record?

3) I do not intend to store the CoProcessFunction.Context. I was just
wondering that since the document said it is only valid during the
invocation, for maintaining custom states of my program logic I guess I
cannot use it.


Thank you,
Chao


On 08/16/2017 03:31 AM, Nico Kruber wrote:

> Hi Chao,
>
> 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
> quote the javadoc of the CoProcessFunction:
>
> "Contrary to the {@link CoFlatMapFunction}, this function can also query the
> time (both event and processing) and set timers, through the provided {@link
> Context}. When reacting to the firing of set timers the function can emit yet
> more elements."
>
> So, imho, both deliver a different level of abstraction and control (high- vs.
> low-level). Also note the different methods available for you to implement.
>
> 2a) In general, Flink calls functions on a per-record base in a serialized
> fashion per task. For each task at a TaskManager, in case of it having
> multiple slots, separate function objects are used where you should only get
> in trouble if you share static references. Otherwise you do not need to worry
> about thread-safety.
>
> 2b) From what I see in the code (StreamTwoInputProcessor), the same should
> apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2
> and processElement1/2 are not called in parallel!
>
> 3) why would you want to store the CoProcessFunction.Context?
>
>
> Nico
>
> On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
>> Hi,
>>
>> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
>> and to what extent? What's the difference between the two Functions? and
>> in general, how does Flink prevent race conditions? Here's my case:
>>
>> I tried to condition on two input streams and produce the third stream
>> if the condition is met. I implemented CoFlatMapFunction and tried to
>> monitor a state using a field in the implemented class (I want to
>> isolate my application from the checkpointing feature, and therefore I
>> do not use the states as documented here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
>> .html). The field served as a flag indicating whether there are some pending
>> data from either input stream, and if yes, processing it along with the
>> arriving data from the other input stream (the processing invokes a native
>> function).
>>
>> But then I got double free error and segmentation fault, which I believe
>> was due to unintentional concurrent access to the native function. Then
>> I tried to wrap the access into a synchronized method, as well as
>> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
>> and the error remained.
>>
>> I considered using CoProcessFunction in my case, but seems to me that it
>> does not handle customary internal states, stating in the javadoc "The
>> context [CoProcessFunction.Context] is only valid during the invocation
>> of this method, do not store it."
>>
>>
>>
>> Thanks,
>> Chao

Reply | Threaded
Open this post in threaded view
|

Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

Nico Kruber
Hi Chao,
what I meant by "per-record base" was actually supposed to be "per-event base"
(event = one entity of whatever the stream contains). As from the API:
processing is supposed to be one event at a time and this is what is performed
internally, too.

Nico

On Thursday, 17 August 2017 05:06:07 CEST Chao Wang wrote:

> Thank you! Nico. That helps me a lot!
>
> 2a) That really clarifies my understanding about Flink. Yes, I think I
> have used static references, since I invoked a native function
> (implemented through JNI) which I believe only has one instance per
> process. And I guess the reason why those Java synchronization
> mechanisms were in vain is because of separate function objects at
> runtime, which results in separate lock objects. Now I use c++ mutex
> within the native function and it resolves my case.
>
> BTW, could you elaborate a bit more about what do you mean by
> "per-record base"? what do you mean by a record?
>
> 3) I do not intend to store the CoProcessFunction.Context. I was just
> wondering that since the document said it is only valid during the
> invocation, for maintaining custom states of my program logic I guess I
> cannot use it.
>
>
> Thank you,
> Chao
>
> On 08/16/2017 03:31 AM, Nico Kruber wrote:
> > Hi Chao,
> >
> > 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
> > quote the javadoc of the CoProcessFunction:
> >
> > "Contrary to the {@link CoFlatMapFunction}, this function can also query
> > the time (both event and processing) and set timers, through the provided
> > {@link Context}. When reacting to the firing of set timers the function
> > can emit yet more elements."
> >
> > So, imho, both deliver a different level of abstraction and control (high-
> > vs. low-level). Also note the different methods available for you to
> > implement.
> >
> > 2a) In general, Flink calls functions on a per-record base in a serialized
> > fashion per task. For each task at a TaskManager, in case of it having
> > multiple slots, separate function objects are used where you should only
> > get in trouble if you share static references. Otherwise you do not need
> > to worry about thread-safety.
> >
> > 2b) From what I see in the code (StreamTwoInputProcessor), the same should
> > apply to CoFlatMapFunction and CoProcessFunction so that calls to
> > flatMap1/2 and processElement1/2 are not called in parallel!
> >
> > 3) why would you want to store the CoProcessFunction.Context?
> >
> >
> > Nico
> >
> > On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
> >> Hi,
> >>
> >> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
> >> and to what extent? What's the difference between the two Functions? and
> >> in general, how does Flink prevent race conditions? Here's my case:
> >>
> >> I tried to condition on two input streams and produce the third stream
> >> if the condition is met. I implemented CoFlatMapFunction and tried to
> >> monitor a state using a field in the implemented class (I want to
> >> isolate my application from the checkpointing feature, and therefore I
> >> do not use the states as documented here
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st
> >> ate .html). The field served as a flag indicating whether there are some
> >> pending data from either input stream, and if yes, processing it along
> >> with the arriving data from the other input stream (the processing
> >> invokes a native function).
> >>
> >> But then I got double free error and segmentation fault, which I believe
> >> was due to unintentional concurrent access to the native function. Then
> >> I tried to wrap the access into a synchronized method, as well as
> >> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
> >> and the error remained.
> >>
> >> I considered using CoProcessFunction in my case, but seems to me that it
> >> does not handle customary internal states, stating in the javadoc "The
> >> context [CoProcessFunction.Context] is only valid during the invocation
> >> of this method, do not store it."
> >>
> >>
> >>
> >> Thanks,
> >> Chao


signature.asc (201 bytes) Download Attachment