Flink UI records received/sent chained-unchained operators

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink UI records received/sent chained-unchained operators

Luis Lázaro
Hi everyone, 
my use case:

• Flink 1.3.0-SNAPSHOT
• DataStream API.
• Cluster mode is standalone, 3 TM and a JM.
• Parallelism is set to 1 on client level from UI.
• Source is Kafka configured with one single partition.

 In Flink’s UI counter, I am getting a record count “sent" greater than the count of entries “received".Despite this, total records is correct, ie, neither I losing nor am I getting more records. Please check out screenshot#1: 



 My question is:  why the total number of records sent minus received is exactly the total leaked? (250-223 = 27), i expected to see 27 "Records sent”
¿ Is Map->Filter-Filter, as a single operator a SplitStream 

.split(
case false => 223   
case true => 27

If I run another job with the same execution plan but unchained operators in global operator (MAP->FILTER->FILTER) , the account is exactly the same records sent to the account filtered records. Please check out screenshot#2




PD: There is a similar question in  Flink UI and records sent , but I think they are not related.

Thanks in advance, any suggestion is well appreciated.

Best regards, Luis



Reply | Threaded
Open this post in threaded view
|

Re: Flink UI records received/sent chained-unchained operators

Chesnay Schepler
Hello,

I will try to reproduce this later today. My current assumption is that the aggregation logic in the webUI for some reason adds the wrong counters,
it's unlikely that the counts are matching just by chance.

Regards,
Chesnay

On 24.05.2017 10:15, Luis Lázaro wrote:
Hi everyone, 
my use case:

• Flink 1.3.0-SNAPSHOT
• DataStream API.
• Cluster mode is standalone, 3 TM and a JM.
• Parallelism is set to 1 on client level from UI.
• Source is Kafka configured with one single partition.

 In Flink’s UI counter, I am getting a record count “sent" greater than the count of entries “received".Despite this, total records is correct, ie, neither I losing nor am I getting more records. Please check out screenshot#1: 



 My question is:  why the total number of records sent minus received is exactly the total leaked? (250-223 = 27), i expected to see 27 "Records sent”
¿ Is Map->Filter-Filter, as a single operator a SplitStream 

.split(
case false => 223   
case true => 27

If I run another job with the same execution plan but unchained operators in global operator (MAP->FILTER->FILTER) , the account is exactly the same records sent to the account filtered records. Please check out screenshot#2




PD: There is a similar question in  Flink UI and records sent , but I think they are not related.

Thanks in advance, any suggestion is well appreciated.

Best regards, Luis




Reply | Threaded
Open this post in threaded view
|

Re: Flink UI records received/sent chained-unchained operators

Chesnay Schepler
I haven't been successful in reproducing this issue, nor did looking through the code provide any leads.

Does this happen every time? If so, could you enable the JMX reporter and check the numRecordsIn/-Out metrics for all operators/tasks?

On 24.05.2017 10:29, Chesnay Schepler wrote:
Hello,

I will try to reproduce this later today. My current assumption is that the aggregation logic in the webUI for some reason adds the wrong counters,
it's unlikely that the counts are matching just by chance.

Regards,
Chesnay

On 24.05.2017 10:15, Luis Lázaro wrote:
Hi everyone, 
my use case:

• Flink 1.3.0-SNAPSHOT
• DataStream API.
• Cluster mode is standalone, 3 TM and a JM.
• Parallelism is set to 1 on client level from UI.
• Source is Kafka configured with one single partition.

 In Flink’s UI counter, I am getting a record count “sent" greater than the count of entries “received".Despite this, total records is correct, ie, neither I losing nor am I getting more records. Please check out screenshot#1: 



 My question is:  why the total number of records sent minus received is exactly the total leaked? (250-223 = 27), i expected to see 27 "Records sent”
¿ Is Map->Filter-Filter, as a single operator a SplitStream 

.split(
case false => 223   
case true => 27

If I run another job with the same execution plan but unchained operators in global operator (MAP->FILTER->FILTER) , the account is exactly the same records sent to the account filtered records. Please check out screenshot#2




PD: There is a similar question in  Flink UI and records sent , but I think they are not related.

Thanks in advance, any suggestion is well appreciated.

Best regards, Luis





Reply | Threaded
Open this post in threaded view
|

Re: Flink UI records received/sent chained-unchained operators

Luis Lázaro


Hi Chesnay, thank you very much for your help.

I'll show screenshots of the tests I've done:

  • snapshot#3: of the 2 filters global operator (map-> filter-> filter) I removed the last, and I added his argument as a condition of the former.
   Counter Records sent is correct.



  • snapshot#4: metrics for execution plan snapshot#3

  • snapshot#5: 
Does this happen every time?
Yes, it seems to happen every time the global operator (map->filter->filter) chains two filters

  • snapshot#6: metrics for execution plan snapshot#5

  • snapshot#7: similar to screenshot#2 (first mail). Counter Records sent is correct.


At the moment I can not provide metrics via JMX.
I will continue testing.

Thanks again Chesnay.

Best regards, Luis.





Reply | Threaded
Open this post in threaded view
|

Re: Flink UI records received/sent chained-unchained operators

Luis Lázaro
Hi, 
i am reporting metrics from JMX and getting similar results: 

  • "Records received" < Records sent" if Operator  Map-> filter(condition1)->filter(condition2), as the snapshot. 
  •  if Operator is Map->filter(condition1&&condition2) results are correct (Received sent = 803)
  • Next operator Sink is receiving Previous_sent - Previous_Received (5332 - 4529 = 803)



Thanks in advance, any suggestion is well appreciated.

Best regards, Luis




Reply | Threaded
Open this post in threaded view
|

Re: Flink UI records received/sent chained-unchained operators

Chesnay Schepler
Pretty sure i know what's going on. If I'm correct, giving the second Filter another name (by calling filter(...).name("superUniqueFilter")) should resolve the issue.

The metric system uses the operator names to distinguish operators, because Flink currently doesn't expose a unique ID for them. (They exist, but only in a very small part at the moment)

So what happens is that both filters count their output, and due to some internal technicalities this means that the output of the task is the output of both filters, added together.

Your first filter apparently isn't filtering anything (so it emits 4.529 records); with the output of the second filter (803) we then arrive at 5332.

On 01.06.2017 16:14, Luis Lázaro wrote:
Hi, 
i am reporting metrics from JMX and getting similar results: 

  • "Records received" < Records sent" if Operator  Map-> filter(condition1)->filter(condition2), as the snapshot. 
  •  if Operator is Map->filter(condition1&&condition2) results are correct (Received sent = 803)
  • Next operator Sink is receiving Previous_sent - Previous_Received (5332 - 4529 = 803)



Thanks in advance, any suggestion is well appreciated.

Best regards, Luis





Reply | Threaded
Open this post in threaded view
|

Re: Flink UI records received/sent chained-unchained operators

Luis Lázaro
Hi Chesnay, thank you very much for your help.
If naming datastreams,  counter “Records sent” are correct, i.e.,

 map.filter(condition1).name.filter(condition2).name

thanks,
best regards

Luis.