Hi everyone,
my use case: • Flink 1.3.0-SNAPSHOT • DataStream API. • Cluster mode is standalone, 3 TM and a JM. • Parallelism is set to 1 on client level from UI. • Source is Kafka configured with one single partition. In Flink’s UI counter, I am getting a record count “sent" greater than the count of entries “received".Despite this, total records is correct, ie, neither I losing nor am I getting more records. Please check out screenshot#1: My question is: why the total number of records sent minus received is exactly the total leaked? (250-223 = 27), i expected to see 27 "Records sent” ¿ Is Map->Filter-Filter, as a single operator a SplitStream ? .split( case false => 223 case true => 27 ) If I run another job with the same execution plan but unchained operators in global operator (MAP->FILTER->FILTER) , the account is exactly the same records sent to the account filtered records. Please check out screenshot#2 PD: There is a similar question in Flink UI and records sent , but I think they are not related. Thanks in advance, any suggestion is well appreciated. Best regards, Luis |
Hello,
I will try to reproduce this later today. My current assumption is that the aggregation logic in the webUI for some reason adds the wrong counters, it's unlikely that the counts are matching just by chance. Regards, Chesnay On 24.05.2017 10:15, Luis Lázaro wrote: Hi everyone,
|
I haven't been successful in
reproducing this issue, nor did looking through the code provide
any leads.
Does this happen every time? If so, could you enable the JMX reporter and check the numRecordsIn/-Out metrics for all operators/tasks? On 24.05.2017 10:29, Chesnay Schepler wrote:
|
Hi Chesnay, thank you very much for your help. I'll show screenshots of the tests I've done:
Counter Records sent is correct.
Does this happen every time? Yes, it seems to happen every time the global operator (map->filter->filter) chains two filters
At the moment I can not provide metrics via JMX. I will continue testing. Thanks again Chesnay. Best regards, Luis. |
Hi, i am reporting metrics from JMX and getting similar results:
Thanks in advance, any suggestion is well appreciated. Best regards, Luis |
Pretty sure i know what's going on. If
I'm correct, giving the second Filter another name (by calling
filter(...).name("superUniqueFilter")) should resolve the issue.
The metric system uses the operator names to distinguish operators, because Flink currently doesn't expose a unique ID for them. (They exist, but only in a very small part at the moment) So what happens is that both filters count their output, and due to some internal technicalities this means that the output of the task is the output of both filters, added together. Your first filter apparently isn't filtering anything (so it emits 4.529 records); with the output of the second filter (803) we then arrive at 5332. On 01.06.2017 16:14, Luis Lázaro wrote:
|
Hi Chesnay, thank you very much for your help.
If naming datastreams, counter “Records sent” are correct, i.e., map.filter(condition1).name.filter(condition2).name thanks, best regards Luis. |
Free forum by Nabble | Edit this page |