Setting the operator-id to measure percentile latency over several jobs

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting the operator-id to measure percentile latency over several jobs

Felipe Gutierrez
Hi community,

I am tracking the latency of operators in Flink according to this reference [1]. When I am using Prometheus+Grafana I can issue a query using "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency" and I can check the percentiles of each "operator_id" and each "operator_subtask_index". Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?

How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring? I used "map(new MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash function that converts the string to a hash value. What is the hash function used so I can identify my operator? I know that I can use the Rest API [2] and if I name my operator it will have always the same hash when I restart the job, but I would like to set its name.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Setting the operator-id to measure percentile latency over several jobs

r_khachatryan
Hi Felipe,

Please find the answers to your questions below.

> Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?
Yes.
> How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring?
You are using the correct api (uid(...))
> What is the hash function used so I can identify my operator?
Flink uses https://guava.dev/releases/18.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)

Regards,
Roman


On Thu, Mar 5, 2020 at 12:45 PM Felipe Gutierrez <[hidden email]> wrote:
Hi community,

I am tracking the latency of operators in Flink according to this reference [1]. When I am using Prometheus+Grafana I can issue a query using "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency" and I can check the percentiles of each "operator_id" and each "operator_subtask_index". Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?

How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring? I used "map(new MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash function that converts the string to a hash value. What is the hash function used so I can identify my operator? I know that I can use the Rest API [2] and if I name my operator it will have always the same hash when I restart the job, but I would like to set its name.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Setting the operator-id to measure percentile latency over several jobs

Felipe Gutierrez
thanks! I was wondering why the operator name is not implemented for the latency metrics, because for the other metrics it is implemented.
but thanks anyway!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Mar 5, 2020 at 2:06 PM Khachatryan Roman <[hidden email]> wrote:
Hi Felipe,

Please find the answers to your questions below.

> Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?
Yes.
> How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring?
You are using the correct api (uid(...))
> What is the hash function used so I can identify my operator?
Flink uses https://guava.dev/releases/18.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)

Regards,
Roman


On Thu, Mar 5, 2020 at 12:45 PM Felipe Gutierrez <[hidden email]> wrote:
Hi community,

I am tracking the latency of operators in Flink according to this reference [1]. When I am using Prometheus+Grafana I can issue a query using "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency" and I can check the percentiles of each "operator_id" and each "operator_subtask_index". Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?

How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring? I used "map(new MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash function that converts the string to a hash value. What is the hash function used so I can identify my operator? I know that I can use the Rest API [2] and if I name my operator it will have always the same hash when I restart the job, but I would like to set its name.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Setting the operator-id to measure percentile latency over several jobs

rmetzger0
[hidden email]: Does it make sense to file a ticket to add the operator name to the latency metrics as well?

On Thu, Mar 5, 2020 at 4:31 PM Felipe Gutierrez <[hidden email]> wrote:
thanks! I was wondering why the operator name is not implemented for the latency metrics, because for the other metrics it is implemented.
but thanks anyway!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Mar 5, 2020 at 2:06 PM Khachatryan Roman <[hidden email]> wrote:
Hi Felipe,

Please find the answers to your questions below.

> Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?
Yes.
> How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring?
You are using the correct api (uid(...))
> What is the hash function used so I can identify my operator?
Flink uses https://guava.dev/releases/18.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)

Regards,
Roman


On Thu, Mar 5, 2020 at 12:45 PM Felipe Gutierrez <[hidden email]> wrote:
Hi community,

I am tracking the latency of operators in Flink according to this reference [1]. When I am using Prometheus+Grafana I can issue a query using "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency" and I can check the percentiles of each "operator_id" and each "operator_subtask_index". Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?

How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring? I used "map(new MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash function that converts the string to a hash value. What is the hash function used so I can identify my operator? I know that I can use the Rest API [2] and if I name my operator it will have always the same hash when I restart the job, but I would like to set its name.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Setting the operator-id to measure percentile latency over several jobs

rmetzger0
I talked to Chesnay about this offline. Shipping the operator names with the latency markers would significantly increase their size (potentially affecting performance)
There is no global lookup from operatorId to operatorName.

On Fri, Mar 6, 2020 at 5:34 PM Robert Metzger <[hidden email]> wrote:
[hidden email]: Does it make sense to file a ticket to add the operator name to the latency metrics as well?

On Thu, Mar 5, 2020 at 4:31 PM Felipe Gutierrez <[hidden email]> wrote:
thanks! I was wondering why the operator name is not implemented for the latency metrics, because for the other metrics it is implemented.
but thanks anyway!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Mar 5, 2020 at 2:06 PM Khachatryan Roman <[hidden email]> wrote:
Hi Felipe,

Please find the answers to your questions below.

> Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?
Yes.
> How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring?
You are using the correct api (uid(...))
> What is the hash function used so I can identify my operator?
Flink uses https://guava.dev/releases/18.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)

Regards,
Roman


On Thu, Mar 5, 2020 at 12:45 PM Felipe Gutierrez <[hidden email]> wrote:
Hi community,

I am tracking the latency of operators in Flink according to this reference [1]. When I am using Prometheus+Grafana I can issue a query using "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency" and I can check the percentiles of each "operator_id" and each "operator_subtask_index". Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it?

How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring? I used "map(new MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash function that converts the string to a hash value. What is the hash function used so I can identify my operator? I know that I can use the Rest API [2] and if I name my operator it will have always the same hash when I restart the job, but I would like to set its name.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez