RichAsyncFunction Timer Service

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

RichAsyncFunction Timer Service

Mikhail Pryakhin-2
Hello, Flink community!

It happens that I need to access a timer service in a RichAsyncFunction implementation. I know it's normally accomplished via StreamingRuntimeContext instance available in a RichFunction, but unfortunately, RichAsyncFunction extending RichFunction overrides “setRuntimeContext” method [1] wrapping a RuntimeContext instance passed as the method argument into a RichAsyncFunctionRuntimeContext instance [2]. This RichAsyncFunction specific RuntimeContext implementation is private [2] which makes it infeasible to gain access to a wrapped original RuntimeContext thus making it impossible to leverage timer service in RichAsyncFunction implementations. Just curious is there any reason for that? Can we make this implementation public or somehow share a wrapped instance?

Many thanks in advance!




Kind Regards,
Mike Pryakhin


smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction Timer Service

Dawid Wysakowicz-2

Hi Mike,

I think the reason why there is no access to TimerService in async function is that as it is an async function, there are no guarantees when/and where(at which stage of the pipeline) the function will actually be executed. This characteristic doesn't align with TimerService and timely callbacks.

Best,

Dawid

On 19/04/2019 17:41, Mikhail Pryakhin wrote:
Hello, Flink community!

It happens that I need to access a timer service in a RichAsyncFunction implementation. I know it's normally accomplished via StreamingRuntimeContext instance available in a RichFunction, but unfortunately, RichAsyncFunction extending RichFunction overrides “setRuntimeContext” method [1] wrapping a RuntimeContext instance passed as the method argument into a RichAsyncFunctionRuntimeContext instance [2]. This RichAsyncFunction specific RuntimeContext implementation is private [2] which makes it infeasible to gain access to a wrapped original RuntimeContext thus making it impossible to leverage timer service in RichAsyncFunction implementations. Just curious is there any reason for that? Can we make this implementation public or somehow share a wrapped instance?

Many thanks in advance!




Kind Regards,
Mike Pryakhin


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction Timer Service

Mikhail Pryakhin-2
Hi David, 
Thank you!

Yes, fair enough, but take for instance a BucketingSink class[1], it is a RichFunction which employs Timeservice to execute time-based logic, which is not directly associated with an event flow, like for example closing files every n minutes, etc. In an AsyncFunction I intended to use Timeservice the same way, like periodically reloading configuration from outside. 
Does it make sense?


Kind Regards,
Mike Pryakhin

On 25 Apr 2019, at 10:59, Dawid Wysakowicz <[hidden email]> wrote:

Hi Mike,

I think the reason why there is no access to TimerService in async function is that as it is an async function, there are no guarantees when/and where(at which stage of the pipeline) the function will actually be executed. This characteristic doesn't align with TimerService and timely callbacks.

Best,

Dawid

On 19/04/2019 17:41, Mikhail Pryakhin wrote:
Hello, Flink community!

It happens that I need to access a timer service in a RichAsyncFunction implementation. I know it's normally accomplished via StreamingRuntimeContext instance available in a RichFunction, but unfortunately, RichAsyncFunction extending RichFunction overrides “setRuntimeContext” method [1] wrapping a RuntimeContext instance passed as the method argument into a RichAsyncFunctionRuntimeContext instance [2]. This RichAsyncFunction specific RuntimeContext implementation is private [2] which makes it infeasible to gain access to a wrapped original RuntimeContext thus making it impossible to leverage timer service in RichAsyncFunction implementations. Just curious is there any reason for that? Can we make this implementation public or somehow share a wrapped instance?

Many thanks in advance!




Kind Regards,
Mike Pryakhin



smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction Timer Service

Guowei Ma
Hi
AFAIK, TimeService is Flink could guarantee the semastics of 
Best,
Guowei


Mikhail Pryakhin <[hidden email]> 于2019年4月26日周五 下午7:57写道:
Hi David, 
Thank you!

Yes, fair enough, but take for instance a BucketingSink class[1], it is a RichFunction which employs Timeservice to execute time-based logic, which is not directly associated with an event flow, like for example closing files every n minutes, etc. In an AsyncFunction I intended to use Timeservice the same way, like periodically reloading configuration from outside. 
Does it make sense?


Kind Regards,
Mike Pryakhin

On 25 Apr 2019, at 10:59, Dawid Wysakowicz <[hidden email]> wrote:

Hi Mike,

I think the reason why there is no access to TimerService in async function is that as it is an async function, there are no guarantees when/and where(at which stage of the pipeline) the function will actually be executed. This characteristic doesn't align with TimerService and timely callbacks.

Best,

Dawid

On 19/04/2019 17:41, Mikhail Pryakhin wrote:
Hello, Flink community!

It happens that I need to access a timer service in a RichAsyncFunction implementation. I know it's normally accomplished via StreamingRuntimeContext instance available in a RichFunction, but unfortunately, RichAsyncFunction extending RichFunction overrides “setRuntimeContext” method [1] wrapping a RuntimeContext instance passed as the method argument into a RichAsyncFunctionRuntimeContext instance [2]. This RichAsyncFunction specific RuntimeContext implementation is private [2] which makes it infeasible to gain access to a wrapped original RuntimeContext thus making it impossible to leverage timer service in RichAsyncFunction implementations. Just curious is there any reason for that? Can we make this implementation public or somehow share a wrapped instance?

Many thanks in advance!




Kind Regards,
Mike Pryakhin


Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction Timer Service

Guowei Ma
Hi,

AFAIK, TimeService in Flink could guarantee the semantics of "at least once/exactly once" after failure.
If you only want to reload config periodically I think you could use Java native Timer yourself.

Best,
Guowei


Guowei Ma <[hidden email]> 于2019年4月28日周日 上午9:25写道:
Hi
AFAIK, TimeService is Flink could guarantee the semastics of 
Best,
Guowei


Mikhail Pryakhin <[hidden email]> 于2019年4月26日周五 下午7:57写道:
Hi David, 
Thank you!

Yes, fair enough, but take for instance a BucketingSink class[1], it is a RichFunction which employs Timeservice to execute time-based logic, which is not directly associated with an event flow, like for example closing files every n minutes, etc. In an AsyncFunction I intended to use Timeservice the same way, like periodically reloading configuration from outside. 
Does it make sense?


Kind Regards,
Mike Pryakhin

On 25 Apr 2019, at 10:59, Dawid Wysakowicz <[hidden email]> wrote:

Hi Mike,

I think the reason why there is no access to TimerService in async function is that as it is an async function, there are no guarantees when/and where(at which stage of the pipeline) the function will actually be executed. This characteristic doesn't align with TimerService and timely callbacks.

Best,

Dawid

On 19/04/2019 17:41, Mikhail Pryakhin wrote:
Hello, Flink community!

It happens that I need to access a timer service in a RichAsyncFunction implementation. I know it's normally accomplished via StreamingRuntimeContext instance available in a RichFunction, but unfortunately, RichAsyncFunction extending RichFunction overrides “setRuntimeContext” method [1] wrapping a RuntimeContext instance passed as the method argument into a RichAsyncFunctionRuntimeContext instance [2]. This RichAsyncFunction specific RuntimeContext implementation is private [2] which makes it infeasible to gain access to a wrapped original RuntimeContext thus making it impossible to leverage timer service in RichAsyncFunction implementations. Just curious is there any reason for that? Can we make this implementation public or somehow share a wrapped instance?

Many thanks in advance!




Kind Regards,
Mike Pryakhin


Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction Timer Service

Mikhail Pryakhin-2
Thanks, Guowei, I see your point.
But I'm afraid there is no direct connection between delivery semantics and TimeService. 
Yes, obviously, Java Timer is the first thing that comes to mind, but it requires an extra thread to perform background work, this approach inflicts some drawbacks such as when a strict amount of resources is available for TM (when a job runs in YARN or Kubernetes clusters). I just wanted to piggyback on already existing TimeService.

The only possible reason for not sharing a TimeService with AyncFunction is that Flink guarantees that onTimer(...) and processElement(...) calls are synchronized. But these guarantees could've worked for AsyncFucntion as well.
So, my question still remains open: Why TimeService is unavailable for RichAsyncFunctions?

Thank you all for sharing your thoughts!

Kind Regards,
Mike Pryakhin

On 28 Apr 2019, at 04:30, Guowei Ma <[hidden email]> wrote:

Hi,

AFAIK, TimeService in Flink could guarantee the semantics of "at least once/exactly once" after failure.
If you only want to reload config periodically I think you could use Java native Timer yourself.

Best,
Guowei


Guowei Ma <[hidden email]> 于2019年4月28日周日 上午9:25写道:
Hi
AFAIK, TimeService is Flink could guarantee the semastics of 
Best,
Guowei


Mikhail Pryakhin <[hidden email]> 于2019年4月26日周五 下午7:57写道:
Hi David, 
Thank you!

Yes, fair enough, but take for instance a BucketingSink class[1], it is a RichFunction which employs Timeservice to execute time-based logic, which is not directly associated with an event flow, like for example closing files every n minutes, etc. In an AsyncFunction I intended to use Timeservice the same way, like periodically reloading configuration from outside. 
Does it make sense?


Kind Regards,
Mike Pryakhin

On 25 Apr 2019, at 10:59, Dawid Wysakowicz <[hidden email]> wrote:

Hi Mike,

I think the reason why there is no access to TimerService in async function is that as it is an async function, there are no guarantees when/and where(at which stage of the pipeline) the function will actually be executed. This characteristic doesn't align with TimerService and timely callbacks.

Best,

Dawid

On 19/04/2019 17:41, Mikhail Pryakhin wrote:
Hello, Flink community!

It happens that I need to access a timer service in a RichAsyncFunction implementation. I know it's normally accomplished via StreamingRuntimeContext instance available in a RichFunction, but unfortunately, RichAsyncFunction extending RichFunction overrides “setRuntimeContext” method [1] wrapping a RuntimeContext instance passed as the method argument into a RichAsyncFunctionRuntimeContext instance [2]. This RichAsyncFunction specific RuntimeContext implementation is private [2] which makes it infeasible to gain access to a wrapped original RuntimeContext thus making it impossible to leverage timer service in RichAsyncFunction implementations. Just curious is there any reason for that? Can we make this implementation public or somehow share a wrapped instance?

Many thanks in advance!




Kind Regards,
Mike Pryakhin




smime.p7s (2K) Download Attachment