post-checkpoint watermark out of sync with event stream?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

post-checkpoint watermark out of sync with event stream?

Cliff Resnick
We have an event-time pipeline that uses a ProcessFunction to accept events with an allowed lateness of a number of days. We a BoundedOutOfOrdernessTimestampExtractor and our event stream has a long tail that occasionally exceeds our allowed lateness, in which case we drop the events.

The logic is simple: 
1. OnElement, we compare the (element's event time + allowed lateness) against the current watermark
2. If element is within time bound bounds we register a timer for (element's event time + allowed lateness). We call this "endTime".
3. during the above window we collect and assimilate all the data for the key and regularly UPSERT the data to a data store.
4. OnTimer for above "endTime" we clear state for the key.

The above has worked well for the past 1-2 years. Last week, however, we had a bug that introduced DEBUG logging to the job config, and this caused several failure/restarts (S3 DEBUG logging is extremely verbose!). Within a day or two, our monitoring system restarted the pipeline several times, sometimes from a Savepoint over an hour or two old. For some reason during this period we noticed that some few long tail data that should have been dropped made it into our data store. These data did not contain assimilated Flink state, meaning they passed through after the endTime key purge (4.) and ended up compromising the data store by replacing assimilated  with tail-end values. 

I'm wondering how this could be possible. The only explanation I can think of is:

4. on "endTime" timer key state is purged.
5 --- job fail ---
6.  job restarted on 2.5 hour old Savepoint
7.  watermark regresses (?) from "endTime" watermark.
8. a long tail event squeaks through under temporarily backdated watermark
9. data store data for key is replaced with long tail data,

Is the above possible, or perhaps there is another possible scenario? Any opinions appreciated!

-Cliff
Reply | Threaded
Open this post in threaded view
|

Re: post-checkpoint watermark out of sync with event stream?

Aljoscha Krettek
Hi Cliff,

On 14.04.20 19:29, Cliff Resnick wrote

> I'm wondering how this could be possible. The only explanation I can think
> of is:
>
> 4. on "endTime" timer key state is purged.
> 5 --- job fail ---
> 6.  job restarted on 2.5 hour old Savepoint
> 7.  watermark regresses (?) from "endTime" watermark.
> 8. a long tail event squeaks through under temporarily backdated watermark
> 9. data store data for key is replaced with long tail data,
>
> Is the above possible, or perhaps there is another possible scenario? Any
> opinions appreciated!
Yes, I'm quite sure this is possible. The reason is that watermarks are
*not* checkpointed at operators but are purely driven by data that comes
through. When a pipeline is (re)started all operators will have -Inf as
the current watermark, even when starting from a checkpoint/savepoint.

We didn't (so far) want to checkpoint the watermark because this would
introduce additional complexity in handling that state. All operators
would be stateful, and it's not trivial (though certainly possible) to
determine the watermark of an operator in a scale in/out scenario.

Is this a big problem for you?

Btw, there's this Jira issue about a symptom of this:
https://issues.apache.org/jira/browse/FLINK-5601. You can see that the
discussion fizzled out from my side, mostly because of the mentioned
complexities. But it might be that we need to try and find a solution
for this.

Best,
Aljoscha