MIs-reported metrics using SideOutput stream + Broadcast

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

MIs-reported metrics using SideOutput stream + Broadcast

Cliff Resnick
Our topology has a metadata source that we push via Broadcast. Because this metadata source is critical, but sometimes late, we added a buffering mechanism via a SideOutput. We call the initial look-up from Broadcast "join"  and the secondary, state-backed buffered  lookup, "late-join"

Today I noticed that if we implement the late join using a KeyedBroadcastProcessFunction, (so we can set TTL timers while using broadcast) everything seems to work. However, even though our internal metrics show the correct numbers, the numbers in the Flink UI falsely indicates that:

1) No broadcast data is sent to the late join, meaning Flink metrics for the metadata operator does not indicate any extra records sent.
2) Primary Join's main stream (not Side Output) is indicated as being sent to Late Join, meaning the Flink metrics input record number from Primary Join matches Late Join's, even though our logs and internal metrics might show zero traffic.

If I do the late join via CoProcessFunction using a metadata keyed stream instead of broadcast, then the Flink UI shows the correct numbers (unfortunately there is another side issue when we take that tack but I won't go into that here).

I hope this was not too confusing. Again the issue is not that this does not work -- it just looks like it does not work in the Flink UI. 

Below is the approximate code. Perhaps I'm doing something wrong that causes the weird reporting? 
val metadata = MetadataTable
.streamFromKafka(env)
val broadcast = createBroadcast(metadata)
val metadataJoined = sourceTables
.union(source1Tables)
.union(source2Tables)
.connect(broadcast)
.process(Broadcast
MetadataJoin()) // this operator will send side output data using Metadata.sideOutputTag
  .name("join")  

val lateJoined = metadataJoined
.getSideOutput(Metadata.side
OutputTag)
.keyBy(_.primaryKey.
getMetadataId)
.connect(
KeyedBroadcastMetadataJoin.broadcast(metadata))
.process(
KeyedBroadcastMetadataJoin())
.name(
"late-join")




Reply | Threaded
Open this post in threaded view
|

Re: MIs-reported metrics using SideOutput stream + Broadcast

Chesnay Schepler
Let's see if i understood everything correctly:

1)
Let's say that metadata contains N records.

The UI output metrics indicate that metadata sends N records.
The UI input metrics for join and late-join do each include N records (i.e N + whatever other data they receive).

You expected that the output of metadata be 2*N since they are broadcasted to 2 operators.

If so, then the metrics work as intended; they count the number of records that the operator emits; the duplication happens behind the scenes somewhere outside the operator. In other words, the metric counts the number of Collector#collect() calls.

2)
Let's say that join emits M records via the main output, and S records via the side-output.

The UI input metrics for late-join indicate that M records have been received.

You expected the input for late-join to be S + N instead, the side-output + broadcast data (see 1) ).

If so, then yeah that's weird and shouldn't happen.

For clarification:
You use the broadcast variable for the join operator, but KeyedBroadcastMetadataJoin.broadcast(metadata) for the late-join.
Is this intended, or just a copy&paste error?

On 03.07.2018 04:16, Cliff Resnick wrote:
Our topology has a metadata source that we push via Broadcast. Because this metadata source is critical, but sometimes late, we added a buffering mechanism via a SideOutput. We call the initial look-up from Broadcast "join"  and the secondary, state-backed buffered  lookup, "late-join"

Today I noticed that if we implement the late join using a KeyedBroadcastProcessFunction, (so we can set TTL timers while using broadcast) everything seems to work. However, even though our internal metrics show the correct numbers, the numbers in the Flink UI falsely indicates that:

1) No broadcast data is sent to the late join, meaning Flink metrics for the metadata operator does not indicate any extra records sent.
2) Primary Join's main stream (not Side Output) is indicated as being sent to Late Join, meaning the Flink metrics input record number from Primary Join matches Late Join's, even though our logs and internal metrics might show zero traffic.

If I do the late join via CoProcessFunction using a metadata keyed stream instead of broadcast, then the Flink UI shows the correct numbers (unfortunately there is another side issue when we take that tack but I won't go into that here).

I hope this was not too confusing. Again the issue is not that this does not work -- it just looks like it does not work in the Flink UI. 

Below is the approximate code. Perhaps I'm doing something wrong that causes the weird reporting? 
val metadata = MetadataTable
  .streamFromKafka(env)
val broadcast = createBroadcast(metadata)
val metadataJoined = sourceTables .union(source1Tables) .union(source2Tables) .connect(broadcast) .process(BroadcastMetadataJoin()) // this operator will send side output data using Metadata.sideOutputTag
  .name("join")  

val lateJoined = metadataJoined
  .getSideOutput(Metadata.sideOutputTag)
  .keyBy(_.primaryKey.getMetadataId)
  .connect(KeyedBroadcastMetadataJoin.broadcast(metadata))
  .process(KeyedBroadcastMetadataJoin())
  .name("late-join")





Reply | Threaded
Open this post in threaded view
|

Re: MIs-reported metrics using SideOutput stream + Broadcast

Cliff Resnick
I found the problem and, of course, it was between my desk and my chair.  For side outputs Flink UI correctly reports (S+N) * Slots. Since the CoProcessFunction late-join was hashed that factored to S+N. Thanks for the help!

On Tue, Jul 3, 2018 at 3:51 AM, Chesnay Schepler <[hidden email]> wrote:
Let's see if i understood everything correctly:

1)
Let's say that metadata contains N records.

The UI output metrics indicate that metadata sends N records.
The UI input metrics for join and late-join do each include N records (i.e N + whatever other data they receive).

You expected that the output of metadata be 2*N since they are broadcasted to 2 operators.

If so, then the metrics work as intended; they count the number of records that the operator emits; the duplication happens behind the scenes somewhere outside the operator. In other words, the metric counts the number of Collector#collect() calls.

2)
Let's say that join emits M records via the main output, and S records via the side-output.

The UI input metrics for late-join indicate that M records have been received.

You expected the input for late-join to be S + N instead, the side-output + broadcast data (see 1) ).

If so, then yeah that's weird and shouldn't happen.

For clarification:
You use the broadcast variable for the join operator, but KeyedBroadcastMetadataJoin.broadcast(metadata) for the late-join.
Is this intended, or just a copy&paste error?


On 03.07.2018 04:16, Cliff Resnick wrote:
Our topology has a metadata source that we push via Broadcast. Because this metadata source is critical, but sometimes late, we added a buffering mechanism via a SideOutput. We call the initial look-up from Broadcast "join"  and the secondary, state-backed buffered  lookup, "late-join"

Today I noticed that if we implement the late join using a KeyedBroadcastProcessFunction, (so we can set TTL timers while using broadcast) everything seems to work. However, even though our internal metrics show the correct numbers, the numbers in the Flink UI falsely indicates that:

1) No broadcast data is sent to the late join, meaning Flink metrics for the metadata operator does not indicate any extra records sent.
2) Primary Join's main stream (not Side Output) is indicated as being sent to Late Join, meaning the Flink metrics input record number from Primary Join matches Late Join's, even though our logs and internal metrics might show zero traffic.

If I do the late join via CoProcessFunction using a metadata keyed stream instead of broadcast, then the Flink UI shows the correct numbers (unfortunately there is another side issue when we take that tack but I won't go into that here).

I hope this was not too confusing. Again the issue is not that this does not work -- it just looks like it does not work in the Flink UI. 

Below is the approximate code. Perhaps I'm doing something wrong that causes the weird reporting? 
val metadata = MetadataTable
  .streamFromKafka(env)
val broadcast = createBroadcast(metadata)
val metadataJoined = sourceTables .union(source1Tables) .union(source2Tables) .connect(broadcast) .process(BroadcastMetadataJoin()) // this operator will send side output data using Metadata.sideOutputTag
  .name("join")  

val lateJoined = metadataJoined
  .getSideOutput(Metadata.sideOutputTag)
  .keyBy(_.primaryKey.getMetadataId)
  .connect(KeyedBroadcastMetadataJoin.broadcast(metadata))
  .process(KeyedBroadcastMetadataJoin())
  .name("late-join")