Time extracting in flink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Time extracting in flink

andy
Hi guys,

I’m trying to write elk log for flink, this help us to store/calculate processing time of a group of operators for business auditing.

I read about process_function and Debugging Windows & Event Time in docs. They’re focus on “keyed” events and monitoring using web/metric, where I want to, for example:

val stream = env.add_source(kafka_source).map(doX).map(doY).filter(Z)
Async.unordedWait(stream, calApiBlah).add_source(kafka_source)

Track total time from `doY` till before add_source operator(). This kind of time can be processing time,
or I want to track total time from source to sink of a stream event  (maybe disgesting time?),
So the target is to retrieve out total time of a set of chosen operators

I think flink should have supported it somehow, but its just me haven’t found it in the docs yet.

Thanks,

Andy,


Reply | Threaded
Open this post in threaded view
|

Re: Time extracting in flink

Biao Liu
Hi Andy,

As far as I know, Flink does not support feature like that. 

I would suggest recording and calculating the time in user code.
For example, add a timestamp field (maybe an array) in your record with printing a timestamp on in by each processing.


Andy Hoang <[hidden email]> 于2019年7月22日周一 下午4:49写道:
Hi guys,

I’m trying to write elk log for flink, this help us to store/calculate processing time of a group of operators for business auditing.

I read about process_function and Debugging Windows & Event Time in docs. They’re focus on “keyed” events and monitoring using web/metric, where I want to, for example:

val stream = env.add_source(kafka_source).map(doX).map(doY).filter(Z)
Async.unordedWait(stream, calApiBlah).add_source(kafka_source)

Track total time from `doY` till before add_source operator(). This kind of time can be processing time,
or I want to track total time from source to sink of a stream event  (maybe disgesting time?),
So the target is to retrieve out total time of a set of chosen operators

I think flink should have supported it somehow, but its just me haven’t found it in the docs yet.

Thanks,

Andy,


Reply | Threaded
Open this post in threaded view
|

Re: Time extracting in flink

andy
Thanks Biao, just want to not reinvent the wheel :)


On Jul 22, 2019, at 4:29 PM, Biao Liu <[hidden email]> wrote:

Hi Andy,

As far as I know, Flink does not support feature like that. 

I would suggest recording and calculating the time in user code.
For example, add a timestamp field (maybe an array) in your record with printing a timestamp on in by each processing.


Andy Hoang <[hidden email]> 于2019年7月22日周一 下午4:49写道:
Hi guys,

I’m trying to write elk log for flink, this help us to store/calculate processing time of a group of operators for business auditing.

I read about process_function and Debugging Windows & Event Time in docs. They’re focus on “keyed” events and monitoring using web/metric, where I want to, for example:

val stream = env.add_source(kafka_source).map(doX).map(doY).filter(Z)
Async.unordedWait(stream, calApiBlah).add_source(kafka_source)

Track total time from `doY` till before add_source operator(). This kind of time can be processing time,
or I want to track total time from source to sink of a stream event  (maybe disgesting time?),
So the target is to retrieve out total time of a set of chosen operators

I think flink should have supported it somehow, but its just me haven’t found it in the docs yet.

Thanks,

Andy,