Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

classic Classic list List threaded Threaded
22 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
I am working on a `CoProcessFunction` that uses a third party library for
detecting certain patterns of events based on some rules. So, in the end,
the `ProcessElement1` method is basically forwarding the events to this
library and registering a callback so that, when a match is detected, the
CoProcessFunction can emit an output event. For achieving this, the callback
relies on a reference to the `out: Collector[T]` parameter in
`ProcessElement1`.

Having said that, I am not sure whether this use case is well-supported by
Flink, since:

1. There might be multiple threads spanned by the third party library (let's
I have not any control over the amount of threads spanned, this is decided
by the library)
2. I am not sure whether `out` might be recreated or something by Flink at
some point, invalidating the references in the callbacks, making them crash

So far I have not observed any issues, but I have just run my program in the
small. It would be great to hear from the experts whether my approach is
valid or not.

PS: Also posted in
https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Yun Gao
      Hi Salva,

            As far as I know, 
               1. Out : Collector[T] could not support multi-thread accessing, namely there could be only one thread writing records at one time. If there are multiple threads using `out`, the access should need to be coordinated in some way (for example, use lock, or use a queue to cache record and let a single thread output them with `out`. 
               2. With the current implementation, `Out` would not be recreated. However, I think it is implementation related, if the processing logic still happens in the `processElement` method, is it possible to always use the `out` object passed into the method? 


           Best, 
           Yun



------------------------------------------------------------------
From:Salva Alcántara <[hidden email]>
Send Time:2020 Feb. 12 (Wed.) 13:33
To:user <[hidden email]>
Subject:Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

I am working on a `CoProcessFunction` that uses a third party library for
detecting certain patterns of events based on some rules. So, in the end,
the `ProcessElement1` method is basically forwarding the events to this
library and registering a callback so that, when a match is detected, the
CoProcessFunction can emit an output event. For achieving this, the callback
relies on a reference to the `out: Collector[T]` parameter in
`ProcessElement1`.

Having said that, I am not sure whether this use case is well-supported by
Flink, since:

1. There might be multiple threads spanned by the third party library (let's
I have not any control over the amount of threads spanned, this is decided
by the library)
2. I am not sure whether `out` might be recreated or something by Flink at
some point, invalidating the references in the callbacks, making them crash

So far I have not observed any issues, but I have just run my program in the
small. It would be great to hear from the experts whether my approach is
valid or not.

PS: Also posted in
https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Kostas Kloudas-5
Hi Salva and Yun,

Yun is correct on that the collector is not thread-safe so writing
should be guarded.

In addition, such a pattern that issues a request to a 3rd party
multi-threaded library and registers a callback for the future does
not play well with checkpointing. In your case, if a failure happens,
the data (or requests) that is "in-flight" are not part of any
checkpoint, thus you may have data loss. Your pattern seems more
suitable to the AsyncIO pattern [1] supported by Flink and it may make
sense to use that for you project.

I hope this helps,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Wed, Feb 12, 2020 at 9:03 AM Yun Gao <[hidden email]> wrote:

>
>       Hi Salva,
>
>             As far as I know,
>                1. Out : Collector[T] could not support multi-thread accessing, namely there could be only one thread writing records at one time. If there are multiple threads using `out`, the access should need to be coordinated in some way (for example, use lock, or use a queue to cache record and let a single thread output them with `out`.
>                2. With the current implementation, `Out` would not be recreated. However, I think it is implementation related, if the processing logic still happens in the `processElement` method, is it possible to always use the `out` object passed into the method?
>
>
>            Best,
>            Yun
>
>
>
> ------------------------------------------------------------------
> From:Salva Alcántara <[hidden email]>
> Send Time:2020 Feb. 12 (Wed.) 13:33
> To:user <[hidden email]>
> Subject:Using multithreaded library within ProcessFunction with callbacks relying on the out parameter
>
> I am working on a `CoProcessFunction` that uses a third party library for
> detecting certain patterns of events based on some rules. So, in the end,
> the `ProcessElement1` method is basically forwarding the events to this
> library and registering a callback so that, when a match is detected, the
> CoProcessFunction can emit an output event. For achieving this, the callback
> relies on a reference to the `out: Collector[T]` parameter in
> `ProcessElement1`.
>
> Having said that, I am not sure whether this use case is well-supported by
> Flink, since:
>
> 1. There might be multiple threads spanned by the third party library (let's
> I have not any control over the amount of threads spanned, this is decided
> by the library)
> 2. I am not sure whether `out` might be recreated or something by Flink at
> some point, invalidating the references in the callbacks, making them crash
>
> So far I have not observed any issues, but I have just run my program in the
> small. It would be great to hear from the experts whether my approach is
> valid or not.
>
> PS: Also posted in
> https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
In reply to this post by Yun Gao
Hi Yun,

Thanks for your prompt and clear answer!

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
In reply to this post by Kostas Kloudas-5
Hi Kostas,

Thanks for your further comments. I will take a look at the AsyncIO pattern.

Regards,

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
In reply to this post by Kostas Kloudas-5
Would your comment still apply if I was using AbstractStreamOperator (passing
its output when registering the callbacks) instead of a UDF? Maybe the
situation changes if I use the Operator API instead...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Kostas Kloudas-2
Hi Salva,

Yes, the same applies to the Operator API as the output is not
thread-safe and there is no way of "checkpointing" the "in-flight"
data without explicit handling.
If you want to dig deeper, I would recommend to have a look also at
the source code of the AsyncWaitOperator to see how you could bypass
these limitations with a custom operator. In fact, you may also be
able to optimise your operator for your specific usecase.

Cheers,
Kostas

On Wed, Feb 12, 2020 at 1:02 PM Salva Alcántara <[hidden email]> wrote:
>
> Would your comment still apply if I was using AbstractStreamOperator (passing
> its output when registering the callbacks) instead of a UDF? Maybe the
> situation changes if I use the Operator API instead...
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
I still need to get into the AsyncWaitOperator, but after taking a look at
the Async I/O API, it seems that the normal use case is when you expect a
result for each element in the input stream, so you register a callback
together with a timeout for each input element. This is not exactly what my
use case requires. In particular, when I send an event to the third party
library, I might get a result...or not. The library is used for detecting
certain patterns, so it is not as when you are querying a database, where
you expect a result within a given time frame for each input element. In my
case, it is more the other way around, most of the time you will not be
expecting any outcome (think of anomaly detection). What I need is a way to
collect the result (if any) from my third party library in my
ProcessFunction, knowing that these outcomes will be exceptional compared
with the cardinality of the input stream. After giving some extra thoughts,
I don't know if the Async I/O pattern really suits my needs...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Piotr Nowojski-3
Hi,

As Kostas has pointed out, the operator's and udf’s APIs are not thread safe and Flink always is calling them from the same, single Task thread. This also includes checkpointing state. Also as Kostas pointed out, the easiest way would be to try use AsyncWaitOperator. If that’s not possible, you can implement your custom logic based on its code.

>  So, in the end,
> the `ProcessElement1` method is basically forwarding the events to this
> library and registering a callback so that, when a match is detected, the
> CoProcessFunction can emit an output event. For achieving this, the callback
> relies on a reference to the `out: Collector[T]` parameter in
> `ProcessElement1`.

In order to achieve this: to emit from a different thread:

Pre Flink 1.10

In the past (before Flink 1.10, so including Flink 1.9), multi threaded operators were supposed to acquire so called “checkpointingLock”. You can hold a reference to the output collector, but before emitting something, you have to acquire `checkpointingLock`. Note that if you acquire it and don’t release for some period of time, whole Task will be blocked from making any progress.

Flink 1.10+

in Flink 1.10 `checkpointLock` was deprecated and will be removed in Flink 1.11. It is replaced by registering asynchronous runnable callbacks “mails", that can be executed by the task thread. So if you:
a) want to emit results produced by a custom thread
b) modify the operator’s state as a result of some work done by a custom thread
In both cases, both things have to be done inside the “mail” action. So for example pattern for a) is:

1. External thread creates record R1 to emit
2. External thread creates a “mail” to emit record R1, and it enqueues it into the mailbox
3. Task's thread picks up the the mail, executes it’s code, and that codes is emitting the record R1 from the Task thread

For both of those patterns, please take a look at the AsyncWaitOperator code in the respective Flink versions. Just keep in mind, that if you implement it using `checkpointingLock`, this will not work anymore in Flink 1.11.

Piotrek

> On 13 Feb 2020, at 10:56, Salva Alcántara <[hidden email]> wrote:
>
> I still need to get into the AsyncWaitOperator, but after taking a look at
> the Async I/O API, it seems that the normal use case is when you expect a
> result for each element in the input stream, so you register a callback
> together with a timeout for each input element. This is not exactly what my
> use case requires. In particular, when I send an event to the third party
> library, I might get a result...or not. The library is used for detecting
> certain patterns, so it is not as when you are querying a database, where
> you expect a result within a given time frame for each input element. In my
> case, it is more the other way around, most of the time you will not be
> expecting any outcome (think of anomaly detection). What I need is a way to
> collect the result (if any) from my third party library in my
> ProcessFunction, knowing that these outcomes will be exceptional compared
> with the cardinality of the input stream. After giving some extra thoughts,
> I don't know if the Async I/O pattern really suits my needs...
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
Many thanks for your detailed response Piotr, it helped a lot!

BTW, I got similar comments from Arvid Heise here:
https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Piotr Nowojski-3
Glad that we could help :) Yes, Arvid’s response is spot on.

Piotrek

> On 13 Feb 2020, at 14:17, Salva Alcántara <[hidden email]> wrote:
>
> Many thanks for your detailed response Piotr, it helped a lot!
>
> BTW, I got similar comments from Arvid Heise here:
> https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
BTW, is it possible to get the checkpoint lock within the processElement
method of a ProcessFunction or this is not possible and I must switch to the
Operator API instead?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Piotr Nowojski-3
You must switch to the Operator API to access the checkpointing lock. It was like by design - Operator API is not stable (@PublicEvolving) - that’s why we were able to deprecate and remove `checkpointingLock` in Flink 1.10/1.11.

Piotrek

> On 13 Feb 2020, at 14:54, Salva Alcántara <[hidden email]> wrote:
>
> BTW, is it possible to get the checkpoint lock within the processElement
> method of a ProcessFunction or this is not possible and I must switch to the
> Operator API instead?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
In reply to this post by Piotr Nowojski-3
Hi Piotr,

Since my current process function already works well for me, except for the
fact I don't have access to the mailbox executor, I have simply created a
custom operator for injecting that:

```
class MyOperator(myFunction: MyFunction)
  extends KeyedCoProcessOperator(myFunction)
{

  private lazy val mailboxExecutor = getContainingTask
    .getMailboxExecutorFactory
    .createExecutor(getOperatorConfig.getChainIndex)

  override def open(): Unit = {
    super.open()
    userFunction.asInstanceOf[MyFunction].mailboxExecutor = mailboxExecutor
  }
}
```

This way I can send mails just fine...in the main application I use like
this

```
.transform("wrapping my function with my operator", new MyOperator(new
MyFunction()))
```

So far everything looks good to me, but if you see problems or know a better
way, it would be great to hear your thoughts on this again. In particular,
the way of getting access to the mailbox executor is a bit clumsy...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
In reply to this post by Piotr Nowojski-3
Hi again Piotr,

I have further considered the mailbox executor approach and I think it will
not be enough for my purposes. Here is why:

- My state consists of models created with a third party library
- These models have their own state, which means that when I forward events
in `ProcessElement1` to these models, the model's state will be updated
accordingly.

So, what would happen if:

- A new element E is processed in `ProcessElement1` and sent to the third
party library model
- A checkpoint is taken, in particular snapshotting all the library models
in use
- The element E that was sent to the library is expected to generate an
output O result when the callback is called, but a failure happens before
that
- Application recovers from the snapshot and continue processing elements,
but the callback generating the expected output O has been lost by now, so
that output will be lost

By considering the above case, I realize that the only option for me might
be to rely on AsyncIO. However, this is far from ideal because I am not
expecting an output result for each element I send to my models. I could use
a timeout but that may slow down processing as asyncIO has a limited queue
of "active" elements. Also, most of the times, I am not expecting a result
back at all from my models (callbacks will be invoked only a few times since
my modes are detecting anomalies).

In your opinion, what would be the best approach for handling this use case?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Piotr Nowojski-3
Hi Salva,

Can not you take into account the pending element that’s stuck somewhere in the transit? Snapshot it as well and during recovery reprocess it? This is exactly that’s AsyncWaitOperator is doing.

Piotrek

> On 5 Apr 2020, at 15:00, Salva Alcántara <[hidden email]> wrote:
>
> Hi again Piotr,
>
> I have further considered the mailbox executor approach and I think it will
> not be enough for my purposes. Here is why:
>
> - My state consists of models created with a third party library
> - These models have their own state, which means that when I forward events
> in `ProcessElement1` to these models, the model's state will be updated
> accordingly.
>
> So, what would happen if:
>
> - A new element E is processed in `ProcessElement1` and sent to the third
> party library model
> - A checkpoint is taken, in particular snapshotting all the library models
> in use
> - The element E that was sent to the library is expected to generate an
> output O result when the callback is called, but a failure happens before
> that
> - Application recovers from the snapshot and continue processing elements,
> but the callback generating the expected output O has been lost by now, so
> that output will be lost
>
> By considering the above case, I realize that the only option for me might
> be to rely on AsyncIO. However, this is far from ideal because I am not
> expecting an output result for each element I send to my models. I could use
> a timeout but that may slow down processing as asyncIO has a limited queue
> of "active" elements. Also, most of the times, I am not expecting a result
> back at all from my models (callbacks will be invoked only a few times since
> my modes are detecting anomalies).
>
> In your opinion, what would be the best approach for handling this use case?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
I agree with your point Piotrek, AsyncIO would handle all the pending data
for me. However, the reason why I did not want to use it is because in my
case, the callbacks are not always called in response of new data being sent
to the third party lib. Indeed, the callback will be called rather
uncommonly (since in my case it will mean that an anomaly has been
detected). This means that If I go with AsyncIO I will need to setup a max
timeout for every element, when only a few of them will actuallyinvoke the
callback (i.e., produce any data in response). This seems rather drastic
because it will probably add too much latency unnecessarily, but I agree on
that maybe there is no other way if I need exactly once guarantees.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Piotr Nowojski-3
Hi,

With:

Can not you take into account the pending element that’s stuck somewhere in the transit? Snapshot it as well and during recovery reprocess it? This is exactly that’s AsyncWaitOperator is doing.

I didn’t mean for you to use AsynWaitOperator, but what both me and Arvid suggested you previously:

> Also as Kostas pointed out, the easiest way would be to try use AsyncWaitOperator. If that’s not possible, you can implement your custom logic based on its code.

You can copy/duplicate & modify/adjust the AsyncWaitOperator logic inside your custom operator. You don’t have to use it if you have some special requirements, you can implement your own custom logic. Specifically I meant to mimic 

org.apache.flink.streaming.api.operators.async.AsyncWaitOperator#queue

Field and how is it being used during snapshotting state & recovery.

Piotrek

On 9 Apr 2020, at 06:10, Salva Alcántara <[hidden email]> wrote:

I agree with your point Piotrek, AsyncIO would handle all the pending data
for me. However, the reason why I did not want to use it is because in my
case, the callbacks are not always called in response of new data being sent
to the third party lib. Indeed, the callback will be called rather
uncommonly (since in my case it will mean that an anomaly has been
detected). This means that If I go with AsyncIO I will need to setup a max
timeout for every element, when only a few of them will actuallyinvoke the
callback (i.e., produce any data in response). This seems rather drastic
because it will probably add too much latency unnecessarily, but I agree on
that maybe there is no other way if I need exactly once guarantees.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

Salva Alcántara
Perfectly understood, thanks a lot for your reply/patience . I will take a
look at AsyncWaitOperator and adapt from there if I really need that.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
12