behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

Jin Yi
hello,

sorry for a long post, but this is a puzzling problem and i am enough of a flink non-expert to be unsure what details are important or not.

background:
i have a flink pipeline that is a series of custom "joins" for the purposes of user event "flattening" that i wrote a custom KeyedCoProcessFunction that either joins on a parent id between the two connected streams using the "left" event's primary key and the foreign key on the right event OR if the right (child) event doesn't have a foreign key, tries to infer the join using heuristics to limit the possible parent events and grabbing the temporally-closest one.  both the inference and state cleanup for these joins are happening on the onTimer method.

everything is based on event time, and i'm using kafka connector input source for the right event inputs to these operators.  here's what the pipeline looks like, with the joins in question acting like a chain of joins with the output of the previous join (left input) being joined with a new raw event source (right input):

Screen Shot 2021-05-20 at 3.12.22 PM.png
these join functions have a time window/duration or interval associated with them to define the duration of join state and inference window.  this is set per operator to allow for in order and out of order join thresholds for id based joins, and this window acts as the scope for inference when a right event that is an inference candidate (missing foreign key id) is about to be evicted from state.

problem:

i have things coded up with side outputs for duplicate, late and dropped events.  the dropped events case is the one i am focusing on since events that go unmatched are dropped when they are evicted from state.  only rhs events are the ones being dropped, with rhs events w/ foreign keys dropped when they go unmatched (late/no left arrival or no valid inference based left event).  with a wide enough time duration setting for both in order and out of order durations, everything gets matched.  however, when testing things out, i observed (expectedly) that the dropped events increases the tighter you make the join window based on these durations.  great, this makes sense.  i wanted to get a better understanding for these durations' impacts, so i wrote our integration/stress test case to focus on just id key based joins to start on this.

further, to help observe the dropping characteristics, i connected the side outputs to some s3 sinks to store these dropped events.  originally, these dropped right events were output properly to the s3 output.  for the integration/stress test setup, they start to appear with durations < 1 minute.

however, i observed that they didn't include the flink Context.timestamp encoded in the event structure anywhere (the left events were already setting the timestamp in the processElement1 method).  i wanted this information to see how event time processing worked in practice.  so, i made a similarly simple change to the processElement2 function to set the timestamp on these right events as they came in.

once i did this, things stopped dropping and everything joined, even if i set the durations down to 1 second on either side.  wut?

i can comment out the single right hand side event timestamp setting code, and get the dropped events (sans flink timestamp).  uncommenting this code to put the timestamps back in, things fully match again.

what is going on?  it feels like a heisenberg effect with the "touched" right events.

any help here would be greatly appreciated.



once i made this change, all the joins 
Reply | Threaded
Open this post in threaded view
|

Re: behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

Jin Yi
(sorry that the last sentence fragment made it into my email... it was a draft comment that i forgot to remove.  my thoughts are/were complete in the first message.)

i do have follow-up questions/thoughts for this thread though.  given my current setup, it seems it's more expected to have the behavior when i touch the right events given how event based watermarks and kafka connector generated watermarks should work.  a 2 input stream op should fire its timers on the min of the left and right watermark inputs based on what i've read.  so, it seems that my custom keyedcoprocessfunction's onTimer should only fire when a slowest watermark of either input stream reaches the timer's time, correct?  the behavior of things being dropped from the right even stream prematurely (what i originally thought was logically conclusive) based on just the right watermark point would be the incorrect behavior?

should i file an issue/bug?

On Thu, May 20, 2021 at 3:39 PM Jin Yi <[hidden email]> wrote:
hello,

sorry for a long post, but this is a puzzling problem and i am enough of a flink non-expert to be unsure what details are important or not.

background:
i have a flink pipeline that is a series of custom "joins" for the purposes of user event "flattening" that i wrote a custom KeyedCoProcessFunction that either joins on a parent id between the two connected streams using the "left" event's primary key and the foreign key on the right event OR if the right (child) event doesn't have a foreign key, tries to infer the join using heuristics to limit the possible parent events and grabbing the temporally-closest one.  both the inference and state cleanup for these joins are happening on the onTimer method.

everything is based on event time, and i'm using kafka connector input source for the right event inputs to these operators.  here's what the pipeline looks like, with the joins in question acting like a chain of joins with the output of the previous join (left input) being joined with a new raw event source (right input):

Screen Shot 2021-05-20 at 3.12.22 PM.png
these join functions have a time window/duration or interval associated with them to define the duration of join state and inference window.  this is set per operator to allow for in order and out of order join thresholds for id based joins, and this window acts as the scope for inference when a right event that is an inference candidate (missing foreign key id) is about to be evicted from state.

problem:

i have things coded up with side outputs for duplicate, late and dropped events.  the dropped events case is the one i am focusing on since events that go unmatched are dropped when they are evicted from state.  only rhs events are the ones being dropped, with rhs events w/ foreign keys dropped when they go unmatched (late/no left arrival or no valid inference based left event).  with a wide enough time duration setting for both in order and out of order durations, everything gets matched.  however, when testing things out, i observed (expectedly) that the dropped events increases the tighter you make the join window based on these durations.  great, this makes sense.  i wanted to get a better understanding for these durations' impacts, so i wrote our integration/stress test case to focus on just id key based joins to start on this.

further, to help observe the dropping characteristics, i connected the side outputs to some s3 sinks to store these dropped events.  originally, these dropped right events were output properly to the s3 output.  for the integration/stress test setup, they start to appear with durations < 1 minute.

however, i observed that they didn't include the flink Context.timestamp encoded in the event structure anywhere (the left events were already setting the timestamp in the processElement1 method).  i wanted this information to see how event time processing worked in practice.  so, i made a similarly simple change to the processElement2 function to set the timestamp on these right events as they came in.

once i did this, things stopped dropping and everything joined, even if i set the durations down to 1 second on either side.  wut?

i can comment out the single right hand side event timestamp setting code, and get the dropped events (sans flink timestamp).  uncommenting this code to put the timestamps back in, things fully match again.

what is going on?  it feels like a heisenberg effect with the "touched" right events.

any help here would be greatly appreciated.



once i made this change, all the joins 
Reply | Threaded
Open this post in threaded view
|

Re: behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

Jin Yi
just to resolve this thread, i figured out the issue.  there's a local version of a watermark strategy that we use when running locally for development that didn't work correctly on many events with the same timestamp which the fake data generation that happens for local runs has a tendency to do.  fixing the local watermark generator used in the strategy to account for this properly fixed all of my issues.

On Fri, May 21, 2021 at 10:09 AM Jin Yi <[hidden email]> wrote:
(sorry that the last sentence fragment made it into my email... it was a draft comment that i forgot to remove.  my thoughts are/were complete in the first message.)

i do have follow-up questions/thoughts for this thread though.  given my current setup, it seems it's more expected to have the behavior when i touch the right events given how event based watermarks and kafka connector generated watermarks should work.  a 2 input stream op should fire its timers on the min of the left and right watermark inputs based on what i've read.  so, it seems that my custom keyedcoprocessfunction's onTimer should only fire when a slowest watermark of either input stream reaches the timer's time, correct?  the behavior of things being dropped from the right even stream prematurely (what i originally thought was logically conclusive) based on just the right watermark point would be the incorrect behavior?

should i file an issue/bug?

On Thu, May 20, 2021 at 3:39 PM Jin Yi <[hidden email]> wrote:
hello,

sorry for a long post, but this is a puzzling problem and i am enough of a flink non-expert to be unsure what details are important or not.

background:
i have a flink pipeline that is a series of custom "joins" for the purposes of user event "flattening" that i wrote a custom KeyedCoProcessFunction that either joins on a parent id between the two connected streams using the "left" event's primary key and the foreign key on the right event OR if the right (child) event doesn't have a foreign key, tries to infer the join using heuristics to limit the possible parent events and grabbing the temporally-closest one.  both the inference and state cleanup for these joins are happening on the onTimer method.

everything is based on event time, and i'm using kafka connector input source for the right event inputs to these operators.  here's what the pipeline looks like, with the joins in question acting like a chain of joins with the output of the previous join (left input) being joined with a new raw event source (right input):

Screen Shot 2021-05-20 at 3.12.22 PM.png
these join functions have a time window/duration or interval associated with them to define the duration of join state and inference window.  this is set per operator to allow for in order and out of order join thresholds for id based joins, and this window acts as the scope for inference when a right event that is an inference candidate (missing foreign key id) is about to be evicted from state.

problem:

i have things coded up with side outputs for duplicate, late and dropped events.  the dropped events case is the one i am focusing on since events that go unmatched are dropped when they are evicted from state.  only rhs events are the ones being dropped, with rhs events w/ foreign keys dropped when they go unmatched (late/no left arrival or no valid inference based left event).  with a wide enough time duration setting for both in order and out of order durations, everything gets matched.  however, when testing things out, i observed (expectedly) that the dropped events increases the tighter you make the join window based on these durations.  great, this makes sense.  i wanted to get a better understanding for these durations' impacts, so i wrote our integration/stress test case to focus on just id key based joins to start on this.

further, to help observe the dropping characteristics, i connected the side outputs to some s3 sinks to store these dropped events.  originally, these dropped right events were output properly to the s3 output.  for the integration/stress test setup, they start to appear with durations < 1 minute.

however, i observed that they didn't include the flink Context.timestamp encoded in the event structure anywhere (the left events were already setting the timestamp in the processElement1 method).  i wanted this information to see how event time processing worked in practice.  so, i made a similarly simple change to the processElement2 function to set the timestamp on these right events as they came in.

once i did this, things stopped dropping and everything joined, even if i set the durations down to 1 second on either side.  wut?

i can comment out the single right hand side event timestamp setting code, and get the dropped events (sans flink timestamp).  uncommenting this code to put the timestamps back in, things fully match again.

what is going on?  it feels like a heisenberg effect with the "touched" right events.

any help here would be greatly appreciated.



once i made this change, all the joins 
Reply | Threaded
Open this post in threaded view
|

Re: behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

Arvid Heise-4
Hi Jin,

as you have figured out, if something goes wrong with watermarks it's usually because of the watermark generator (sorry for not receiving any feedback whatsoever).

Thank you very much for sharing your solution!

On Thu, Jun 3, 2021 at 8:51 PM Jin Yi <[hidden email]> wrote:
just to resolve this thread, i figured out the issue.  there's a local version of a watermark strategy that we use when running locally for development that didn't work correctly on many events with the same timestamp which the fake data generation that happens for local runs has a tendency to do.  fixing the local watermark generator used in the strategy to account for this properly fixed all of my issues.

On Fri, May 21, 2021 at 10:09 AM Jin Yi <[hidden email]> wrote:
(sorry that the last sentence fragment made it into my email... it was a draft comment that i forgot to remove.  my thoughts are/were complete in the first message.)

i do have follow-up questions/thoughts for this thread though.  given my current setup, it seems it's more expected to have the behavior when i touch the right events given how event based watermarks and kafka connector generated watermarks should work.  a 2 input stream op should fire its timers on the min of the left and right watermark inputs based on what i've read.  so, it seems that my custom keyedcoprocessfunction's onTimer should only fire when a slowest watermark of either input stream reaches the timer's time, correct?  the behavior of things being dropped from the right even stream prematurely (what i originally thought was logically conclusive) based on just the right watermark point would be the incorrect behavior?

should i file an issue/bug?

On Thu, May 20, 2021 at 3:39 PM Jin Yi <[hidden email]> wrote:
hello,

sorry for a long post, but this is a puzzling problem and i am enough of a flink non-expert to be unsure what details are important or not.

background:
i have a flink pipeline that is a series of custom "joins" for the purposes of user event "flattening" that i wrote a custom KeyedCoProcessFunction that either joins on a parent id between the two connected streams using the "left" event's primary key and the foreign key on the right event OR if the right (child) event doesn't have a foreign key, tries to infer the join using heuristics to limit the possible parent events and grabbing the temporally-closest one.  both the inference and state cleanup for these joins are happening on the onTimer method.

everything is based on event time, and i'm using kafka connector input source for the right event inputs to these operators.  here's what the pipeline looks like, with the joins in question acting like a chain of joins with the output of the previous join (left input) being joined with a new raw event source (right input):

Screen Shot 2021-05-20 at 3.12.22 PM.png
these join functions have a time window/duration or interval associated with them to define the duration of join state and inference window.  this is set per operator to allow for in order and out of order join thresholds for id based joins, and this window acts as the scope for inference when a right event that is an inference candidate (missing foreign key id) is about to be evicted from state.

problem:

i have things coded up with side outputs for duplicate, late and dropped events.  the dropped events case is the one i am focusing on since events that go unmatched are dropped when they are evicted from state.  only rhs events are the ones being dropped, with rhs events w/ foreign keys dropped when they go unmatched (late/no left arrival or no valid inference based left event).  with a wide enough time duration setting for both in order and out of order durations, everything gets matched.  however, when testing things out, i observed (expectedly) that the dropped events increases the tighter you make the join window based on these durations.  great, this makes sense.  i wanted to get a better understanding for these durations' impacts, so i wrote our integration/stress test case to focus on just id key based joins to start on this.

further, to help observe the dropping characteristics, i connected the side outputs to some s3 sinks to store these dropped events.  originally, these dropped right events were output properly to the s3 output.  for the integration/stress test setup, they start to appear with durations < 1 minute.

however, i observed that they didn't include the flink Context.timestamp encoded in the event structure anywhere (the left events were already setting the timestamp in the processElement1 method).  i wanted this information to see how event time processing worked in practice.  so, i made a similarly simple change to the processElement2 function to set the timestamp on these right events as they came in.

once i did this, things stopped dropping and everything joined, even if i set the durations down to 1 second on either side.  wut?

i can comment out the single right hand side event timestamp setting code, and get the dropped events (sans flink timestamp).  uncommenting this code to put the timestamps back in, things fully match again.

what is going on?  it feels like a heisenberg effect with the "touched" right events.

any help here would be greatly appreciated.



once i made this change, all the joins