I am working on a `CoProcessFunction` that uses a third party library for
detecting certain patterns of events based on some rules. So, in the end, the `ProcessElement1` method is basically forwarding the events to this library and registering a callback so that, when a match is detected, the CoProcessFunction can emit an output event. For achieving this, the callback relies on a reference to the `out: Collector[T]` parameter in `ProcessElement1`. Having said that, I am not sure whether this use case is well-supported by Flink, since: 1. There might be multiple threads spanned by the third party library (let's I have not any control over the amount of threads spanned, this is decided by the library) 2. I am not sure whether `out` might be recreated or something by Flink at some point, invalidating the references in the callbacks, making them crash So far I have not observed any issues, but I have just run my program in the small. It would be great to hear from the experts whether my approach is valid or not. PS: Also posted in https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Salva, As far as I know, 1. Out : Collector[T] could not support multi-thread accessing, namely there could be only one thread writing records at one time. If there are multiple threads using `out`, the access should need to be coordinated in some way (for example, use lock, or use a queue to cache record and let a single thread output them with `out`. 2. With the current implementation, `Out` would not be recreated. However, I think it is implementation related, if the processing logic still happens in the `processElement` method, is it possible to always use the `out` object passed into the method? Best, Yun
|
Hi Salva and Yun,
Yun is correct on that the collector is not thread-safe so writing should be guarded. In addition, such a pattern that issues a request to a 3rd party multi-threaded library and registers a callback for the future does not play well with checkpointing. In your case, if a failure happens, the data (or requests) that is "in-flight" are not part of any checkpoint, thus you may have data loss. Your pattern seems more suitable to the AsyncIO pattern [1] supported by Flink and it may make sense to use that for you project. I hope this helps, Kostas [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html On Wed, Feb 12, 2020 at 9:03 AM Yun Gao <[hidden email]> wrote: > > Hi Salva, > > As far as I know, > 1. Out : Collector[T] could not support multi-thread accessing, namely there could be only one thread writing records at one time. If there are multiple threads using `out`, the access should need to be coordinated in some way (for example, use lock, or use a queue to cache record and let a single thread output them with `out`. > 2. With the current implementation, `Out` would not be recreated. However, I think it is implementation related, if the processing logic still happens in the `processElement` method, is it possible to always use the `out` object passed into the method? > > > Best, > Yun > > > > ------------------------------------------------------------------ > From:Salva Alcántara <[hidden email]> > Send Time:2020 Feb. 12 (Wed.) 13:33 > To:user <[hidden email]> > Subject:Using multithreaded library within ProcessFunction with callbacks relying on the out parameter > > I am working on a `CoProcessFunction` that uses a third party library for > detecting certain patterns of events based on some rules. So, in the end, > the `ProcessElement1` method is basically forwarding the events to this > library and registering a callback so that, when a match is detected, the > CoProcessFunction can emit an output event. For achieving this, the callback > relies on a reference to the `out: Collector[T]` parameter in > `ProcessElement1`. > > Having said that, I am not sure whether this use case is well-supported by > Flink, since: > > 1. There might be multiple threads spanned by the third party library (let's > I have not any control over the amount of threads spanned, this is decided > by the library) > 2. I am not sure whether `out` might be recreated or something by Flink at > some point, invalidating the references in the callbacks, making them crash > > So far I have not observed any issues, but I have just run my program in the > small. It would be great to hear from the experts whether my approach is > valid or not. > > PS: Also posted in > https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > |
In reply to this post by Yun Gao
Hi Yun,
Thanks for your prompt and clear answer! Salva -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Kostas Kloudas-5
Hi Kostas,
Thanks for your further comments. I will take a look at the AsyncIO pattern. Regards, Salva -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Kostas Kloudas-5
Would your comment still apply if I was using AbstractStreamOperator (passing
its output when registering the callbacks) instead of a UDF? Maybe the situation changes if I use the Operator API instead... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Salva,
Yes, the same applies to the Operator API as the output is not thread-safe and there is no way of "checkpointing" the "in-flight" data without explicit handling. If you want to dig deeper, I would recommend to have a look also at the source code of the AsyncWaitOperator to see how you could bypass these limitations with a custom operator. In fact, you may also be able to optimise your operator for your specific usecase. Cheers, Kostas On Wed, Feb 12, 2020 at 1:02 PM Salva Alcántara <[hidden email]> wrote: > > Would your comment still apply if I was using AbstractStreamOperator (passing > its output when registering the callbacks) instead of a UDF? Maybe the > situation changes if I use the Operator API instead... > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I still need to get into the AsyncWaitOperator, but after taking a look at
the Async I/O API, it seems that the normal use case is when you expect a result for each element in the input stream, so you register a callback together with a timeout for each input element. This is not exactly what my use case requires. In particular, when I send an event to the third party library, I might get a result...or not. The library is used for detecting certain patterns, so it is not as when you are querying a database, where you expect a result within a given time frame for each input element. In my case, it is more the other way around, most of the time you will not be expecting any outcome (think of anomaly detection). What I need is a way to collect the result (if any) from my third party library in my ProcessFunction, knowing that these outcomes will be exceptional compared with the cardinality of the input stream. After giving some extra thoughts, I don't know if the Async I/O pattern really suits my needs... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
As Kostas has pointed out, the operator's and udf’s APIs are not thread safe and Flink always is calling them from the same, single Task thread. This also includes checkpointing state. Also as Kostas pointed out, the easiest way would be to try use AsyncWaitOperator. If that’s not possible, you can implement your custom logic based on its code. > So, in the end, > the `ProcessElement1` method is basically forwarding the events to this > library and registering a callback so that, when a match is detected, the > CoProcessFunction can emit an output event. For achieving this, the callback > relies on a reference to the `out: Collector[T]` parameter in > `ProcessElement1`. In order to achieve this: to emit from a different thread: Pre Flink 1.10 In the past (before Flink 1.10, so including Flink 1.9), multi threaded operators were supposed to acquire so called “checkpointingLock”. You can hold a reference to the output collector, but before emitting something, you have to acquire `checkpointingLock`. Note that if you acquire it and don’t release for some period of time, whole Task will be blocked from making any progress. Flink 1.10+ in Flink 1.10 `checkpointLock` was deprecated and will be removed in Flink 1.11. It is replaced by registering asynchronous runnable callbacks “mails", that can be executed by the task thread. So if you: a) want to emit results produced by a custom thread b) modify the operator’s state as a result of some work done by a custom thread In both cases, both things have to be done inside the “mail” action. So for example pattern for a) is: 1. External thread creates record R1 to emit 2. External thread creates a “mail” to emit record R1, and it enqueues it into the mailbox 3. Task's thread picks up the the mail, executes it’s code, and that codes is emitting the record R1 from the Task thread For both of those patterns, please take a look at the AsyncWaitOperator code in the respective Flink versions. Just keep in mind, that if you implement it using `checkpointingLock`, this will not work anymore in Flink 1.11. Piotrek > On 13 Feb 2020, at 10:56, Salva Alcántara <[hidden email]> wrote: > > I still need to get into the AsyncWaitOperator, but after taking a look at > the Async I/O API, it seems that the normal use case is when you expect a > result for each element in the input stream, so you register a callback > together with a timeout for each input element. This is not exactly what my > use case requires. In particular, when I send an event to the third party > library, I might get a result...or not. The library is used for detecting > certain patterns, so it is not as when you are querying a database, where > you expect a result within a given time frame for each input element. In my > case, it is more the other way around, most of the time you will not be > expecting any outcome (think of anomaly detection). What I need is a way to > collect the result (if any) from my third party library in my > ProcessFunction, knowing that these outcomes will be exceptional compared > with the cardinality of the input stream. After giving some extra thoughts, > I don't know if the Async I/O pattern really suits my needs... > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Many thanks for your detailed response Piotr, it helped a lot!
BTW, I got similar comments from Arvid Heise here: https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Glad that we could help :) Yes, Arvid’s response is spot on.
Piotrek > On 13 Feb 2020, at 14:17, Salva Alcántara <[hidden email]> wrote: > > Many thanks for your detailed response Piotr, it helped a lot! > > BTW, I got similar comments from Arvid Heise here: > https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
BTW, is it possible to get the checkpoint lock within the processElement
method of a ProcessFunction or this is not possible and I must switch to the Operator API instead? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
You must switch to the Operator API to access the checkpointing lock. It was like by design - Operator API is not stable (@PublicEvolving) - that’s why we were able to deprecate and remove `checkpointingLock` in Flink 1.10/1.11.
Piotrek > On 13 Feb 2020, at 14:54, Salva Alcántara <[hidden email]> wrote: > > BTW, is it possible to get the checkpoint lock within the processElement > method of a ProcessFunction or this is not possible and I must switch to the > Operator API instead? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Ok many thanks again!
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Piotr Nowojski-3
Hi Piotr,
Since my current process function already works well for me, except for the fact I don't have access to the mailbox executor, I have simply created a custom operator for injecting that: ``` class MyOperator(myFunction: MyFunction) extends KeyedCoProcessOperator(myFunction) { private lazy val mailboxExecutor = getContainingTask .getMailboxExecutorFactory .createExecutor(getOperatorConfig.getChainIndex) override def open(): Unit = { super.open() userFunction.asInstanceOf[MyFunction].mailboxExecutor = mailboxExecutor } } ``` This way I can send mails just fine...in the main application I use like this ``` .transform("wrapping my function with my operator", new MyOperator(new MyFunction())) ``` So far everything looks good to me, but if you see problems or know a better way, it would be great to hear your thoughts on this again. In particular, the way of getting access to the mailbox executor is a bit clumsy... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Piotr Nowojski-3
Hi again Piotr,
I have further considered the mailbox executor approach and I think it will not be enough for my purposes. Here is why: - My state consists of models created with a third party library - These models have their own state, which means that when I forward events in `ProcessElement1` to these models, the model's state will be updated accordingly. So, what would happen if: - A new element E is processed in `ProcessElement1` and sent to the third party library model - A checkpoint is taken, in particular snapshotting all the library models in use - The element E that was sent to the library is expected to generate an output O result when the callback is called, but a failure happens before that - Application recovers from the snapshot and continue processing elements, but the callback generating the expected output O has been lost by now, so that output will be lost By considering the above case, I realize that the only option for me might be to rely on AsyncIO. However, this is far from ideal because I am not expecting an output result for each element I send to my models. I could use a timeout but that may slow down processing as asyncIO has a limited queue of "active" elements. Also, most of the times, I am not expecting a result back at all from my models (callbacks will be invoked only a few times since my modes are detecting anomalies). In your opinion, what would be the best approach for handling this use case? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Salva,
Can not you take into account the pending element that’s stuck somewhere in the transit? Snapshot it as well and during recovery reprocess it? This is exactly that’s AsyncWaitOperator is doing. Piotrek > On 5 Apr 2020, at 15:00, Salva Alcántara <[hidden email]> wrote: > > Hi again Piotr, > > I have further considered the mailbox executor approach and I think it will > not be enough for my purposes. Here is why: > > - My state consists of models created with a third party library > - These models have their own state, which means that when I forward events > in `ProcessElement1` to these models, the model's state will be updated > accordingly. > > So, what would happen if: > > - A new element E is processed in `ProcessElement1` and sent to the third > party library model > - A checkpoint is taken, in particular snapshotting all the library models > in use > - The element E that was sent to the library is expected to generate an > output O result when the callback is called, but a failure happens before > that > - Application recovers from the snapshot and continue processing elements, > but the callback generating the expected output O has been lost by now, so > that output will be lost > > By considering the above case, I realize that the only option for me might > be to rely on AsyncIO. However, this is far from ideal because I am not > expecting an output result for each element I send to my models. I could use > a timeout but that may slow down processing as asyncIO has a limited queue > of "active" elements. Also, most of the times, I am not expecting a result > back at all from my models (callbacks will be invoked only a few times since > my modes are detecting anomalies). > > In your opinion, what would be the best approach for handling this use case? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I agree with your point Piotrek, AsyncIO would handle all the pending data
for me. However, the reason why I did not want to use it is because in my case, the callbacks are not always called in response of new data being sent to the third party lib. Indeed, the callback will be called rather uncommonly (since in my case it will mean that an anomaly has been detected). This means that If I go with AsyncIO I will need to setup a max timeout for every element, when only a few of them will actuallyinvoke the callback (i.e., produce any data in response). This seems rather drastic because it will probably add too much latency unnecessarily, but I agree on that maybe there is no other way if I need exactly once guarantees. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
With: > Can not you take into account the pending element that’s stuck somewhere in the transit? Snapshot it as well and during recovery reprocess it? This is exactly that’s AsyncWaitOperator is doing. I didn’t mean for you to use AsynWaitOperator, but what both me and Arvid suggested you previously: > Also as Kostas pointed out, the easiest way would be to try use AsyncWaitOperator. If that’s not possible, you can implement your custom logic based on its code.
You can copy/duplicate & modify/adjust the AsyncWaitOperator logic inside your custom operator. You don’t have to use it if you have some special requirements, you can implement your own custom logic. Specifically I meant to mimic org.apache.flink.streaming.api.operators.async.AsyncWaitOperator#queue Field and how is it being used during snapshotting state & recovery. Piotrek
|
Perfectly understood, thanks a lot for your reply/patience . I will take a
look at AsyncWaitOperator and adapt from there if I really need that. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |