Task and Operator Monitoring via JMX / naming

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Task and Operator Monitoring via JMX / naming

Philipp Bussche
Hi there,
I am struggeling to understand what I am looking at after enabling JMX metric reporting on my taskmanager.
The job I am trying this out with has 1 source, 2 map functions (where one is a RichMap) and 3 sinks.
This is how I have written my Job:

DataStream<Invitation> invitations = streaming
                                        .addSource(new FlinkKafkaConsumer09<>(
                                                        dsp.getPropertyAsString("kafka.invitation.topic"),
                                                        new InvitationSchema(),
                                                        kafkaProps)).name("KafkaSource");
invitations.addSink(new PostgresqlInvitationDetailsSink<>(psqlConfig)).name("InvitationDetailSink");

DataStream<Tuple2<String, String>> tokens = invitations
                                .map(new TokenExtractor()).name("TokenMapStream");
tokens.addSink(new PostgresqlInvitationTokenSink<>(psqlConfig)).name("InvitationTokenSink");

DataStream<Tuple4<String, String, String, String>> invitationResponses = invitations
                                .map(new InvitationDetailsExtractor(psqlConfig, tokenToSenderMapping)).name("InvitationDetailsRichMapStream");
invitationResponses.addSink(new Neo4JInvitationSink<>(neo4jConfig)).name("InvitationRelationshipSink");

streaming.execute("InvitationJob");

Somehow I was expecting to have metrics representing the source, the sinks and the operators, however instead of 6 entries in my JMX tree I only have 4. Please see screenshot attached. Also I was somehow expecting the JMX objects to be named like my task / operator names but it has all sorts of prefix/suffix magic around the names. Finally I have one custom metric which obviously is attached to my RichMapFunction (InvitationDetailsExtractor). However the custom metric (invitationDetailsAdded) shows up under an object where one of the keys (which I would expect to be set to the operation name) is a combination of the prefix "Sink" plus the name of the first sink that I am using plus the name of the first map function (which is not the RichMapFunction actually: my custom metric "invitationDetailsAdded" shows up under "(Sink-_InvitationDetailSink-_TokenMapStream" which is very confusing because this metric is actually incremented as part of the InvitationDetailsRichMapStream).

Can somebody please explain what I can expect from metrics exposed via JMX (should they really represent my tasks and operations) and why the naming is so strange ?

Thanks
Philipp

Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Chesnay Schepler
Hello Philipp,

there is certainly something very wrong here.

What you _should_ see is 6 entries, 1 for each operator; 2-3 more for
the tasks the operators are executed in and the taskmanager stuff.

Usually, operator metrics use the name that you configured, like
"TokenMapStream", whereas tasks use the concatenation of all operator
names joined with =>, as in "KafkaSource => TokenMapStream".

I will look into this, I've never seen these issues before.

One more thing, which version of Flink are you currently using?

Regards,
Chesnay


On 15.10.2016 00:07, Philipp Bussche wrote:

> Hi there,
> I am struggeling to understand what I am looking at after enabling JMX
> metric reporting on my taskmanager.
> The job I am trying this out with has 1 source, 2 map functions (where one
> is a RichMap) and 3 sinks.
> This is how I have written my Job:
>
> DataStream<Invitation> invitations = streaming
> .addSource(new FlinkKafkaConsumer09<>(
> dsp.getPropertyAsString("kafka.invitation.topic"),
> new InvitationSchema(),
> kafkaProps)).name("KafkaSource");
> invitations.addSink(new
> PostgresqlInvitationDetailsSink<>(psqlConfig)).name("InvitationDetailSink");
>
> DataStream<Tuple2&lt;String, String>> tokens = invitations
> .map(new TokenExtractor()).name("TokenMapStream");
> tokens.addSink(new
> PostgresqlInvitationTokenSink<>(psqlConfig)).name("InvitationTokenSink");
>
> DataStream<Tuple4&lt;String, String, String, String>> invitationResponses =
> invitations
> .map(new InvitationDetailsExtractor(psqlConfig,
> tokenToSenderMapping)).name("InvitationDetailsRichMapStream");
> invitationResponses.addSink(new
> Neo4JInvitationSink<>(neo4jConfig)).name("InvitationRelationshipSink");
>
> streaming.execute("InvitationJob");
>
> Somehow I was expecting to have metrics representing the source, the sinks
> and the operators, however instead of 6 entries in my JMX tree I only have
> 4. Please see screenshot attached. Also I was somehow expecting the JMX
> objects to be named like my task / operator names but it has all sorts of
> prefix/suffix magic around the names. Finally I have one custom metric which
> obviously is attached to my RichMapFunction (InvitationDetailsExtractor).
> However the custom metric (invitationDetailsAdded) shows up under an object
> where one of the keys (which I would expect to be set to the operation name)
> is a combination of the prefix "Sink" plus the name of the first sink that I
> am using plus the name of the first map function (which is not the
> RichMapFunction actually: my custom metric "invitationDetailsAdded" shows up
> under "(Sink-_InvitationDetailSink-_TokenMapStream" which is very confusing
> because this metric is actually incremented as part of the
> InvitationDetailsRichMapStream).
>
> Can somebody please explain what I can expect from metrics exposed via JMX
> (should they really represent my tasks and operations) and why the naming is
> so strange ?
>
> Thanks
> Philipp
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n9560/Screen_Shot_2016-10-15_at_00.png>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Philipp Bussche
Thanks Chesnay, this is on Flink 1.1.3
Please also note that e.g. the first item in the list which has the custom metric attached to it starts with a leading "(". It might be that the parsing of the names is not working quite as expected.
I was trying to find out where these names come from but wasn't able to identify it in the source. If you know and want to give me a hint I can also do some more debugging !
Thanks
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Chesnay Schepler
Hello Philipp,

the relevant names are stored in the OperatorMetricGroup/TaskMetricGroup
classes in flink-runtime.

The name for a task is extracted directly from the
TaskDeploymentDescriptor in TaskManagerJobMetricGroup#addTask().
The name for a streaming operator that the metric system uses is set in
AbstractStreamOperator#setup() and is derived
from the task name.

Regards,
Chesnay

On 15.10.2016 10:08, Philipp Bussche wrote:

> Thanks Chesnay, this is on Flink 1.1.3
> Please also note that e.g. the first item in the list which has the custom
> metric attached to it starts with a leading "(". It might be that the
> parsing of the names is not working quite as expected.
> I was trying to find out where these names come from but wasn't able to
> identify it in the source. If you know and want to give me a hint I can also
> do some more debugging !
> Thanks
> Philipp
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9564.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Philipp Bussche
Thanks Chesnay.

I had a look at how the JMX representation looks like when I look at a Task Manager which has one of the example Jobs deployed (https://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/run_example_quickstart.html) and this looks correct.
I assume at this point that the naming gets confused because I am having multiple sinks in my Job and more than one operator on the same stream. Maybe this is not expected and I should only have one operator and one sink per Job ? However the job itself does what it is supposed to so I would only change this for the monitoring as it stands right now.
Also it seems to make a difference when things are happening in the job.
I had a print (sink) of the wikipedia source stream right at after the source is read and after moving this print statement to the very end of the job class the representation in JMX changes. I would expect the naming of sinks and operators to be always the same regardless of when they happen, no ?

Thanks
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Philipp Bussche
Some further observations: I had a Job which was taking events of a Kafka topic and sending it to two sinks whereas for one of them a Map operation would happen first. When creating one event stream and sending it to the two sinks the JMX representation was not showing both sinks and the naming of the Map operation was also not right. But when creating two event streams in the job (basically two Kafka consumers doing the exact same) and then sending each to one sink the naming changed and seem to look like what I would expect.
A question remains though if it is best practise anyways to do one thing with a Job only (like one map operation and one distribution to a sink) and hence having multiple streams is the way to go or if this is still unexpected behaviour what I see in my environment and should be fixed ?
Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Chesnay Schepler
This is completely unintended behavior; you should never have to adjust
your topology so the metric system get's the names right.

I'll take a deep look into this tomorrow ;)

Regards,
Chesnay

On 20.10.2016 08:50, Philipp Bussche wrote:

> Some further observations: I had a Job which was taking events of a Kafka
> topic and sending it to two sinks whereas for one of them a Map operation
> would happen first. When creating one event stream and sending it to the two
> sinks the JMX representation was not showing both sinks and the naming of
> the Map operation was also not right. But when creating two event streams in
> the job (basically two Kafka consumers doing the exact same) and then
> sending each to one sink the naming changed and seem to look like what I
> would expect.
> A question remains though if it is best practise anyways to do one thing
> with a Job only (like one map operation and one distribution to a sink) and
> hence having multiple streams is the way to go or if this is still
> unexpected behaviour what I see in my environment and should be fixed ?
> Thanks
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9642.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Philipp Bussche
Thanks Chesnay,
I am happy to share more around my environment and do additional testing for this.
Also I would be happy to help fixing if we see there might be an issue in the code somewhere.
In fact I am still trying to get a Hacktoberfest T-Shirt and I am still pull requests short  ;)
Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Chesnay Schepler
Well the issue is the following:

the metric system assumes the following naming scheme for tasks based on
the DataSet API and simple streaming jobs: [CHAIN] operatorName1 [=>
operatorName2 [ ...]]
To retrieve the operator name the above is split by "=>", giving us a
String[] of all operator names in a task, from which we then select the
correct one based on the position in the chain.

However, the Stremaing API has some fancy chaining stuff going on, where
multiple operations can be chained to a single one which results in a
name like this: operatorName1 => (operatorName2, operatorName3)

For both op2 and op3 the chain index is identical (since for a tree
structure the index is the depth), resulting in both picking
(operatorName2, operatorName3) as their name, which is obviously wrong.

The solution (which i already implemented, sorry for that) is to simply
stop inferring the operator names from the task (it was hacky to being
with) and just encode them in the configuration for the operator.
This can be seen here:
https://github.com/zentol/flink/commit/7f439525a26504e98b72f2d39b987ac878464419

Regards,
Chesnay

On 20.10.2016 14:21, Philipp Bussche wrote:

> Thanks Chesnay,
> I am happy to share more around my environment and do additional testing for
> this.
> Also I would be happy to help fixing if we see there might be an issue in
> the code somewhere.
> In fact I am still trying to get a Hacktoberfest T-Shirt and I am still pull
> requests short  ;)
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9650.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Philipp Bussche
Thanks Chesnay !

I am not too familiar with the release cycles here but was wondering when one could expect your fix to be in the master of Flink ? Should I create my own build for the moment maybe ?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Task and Operator Monitoring via JMX / naming

Chesnay Schepler
It will be in the master tomorrow.

On 20.10.2016 18:50, Philipp Bussche wrote:

> Thanks Chesnay !
>
> I am not too familiar with the release cycles here but was wondering when
> one could expect your fix to be in the master of Flink ? Should I create my
> own build for the moment maybe ?
>
> Thanks.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9662.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>