Re: Flink Scheduler Customization

Posted by Fabian Hueske-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-Scheduler-Customization-tp23514p23529.html

Hi Ananth,

You can certainly do this with Flink, but there are no built-in operators for this.
What you probably want to do is to compare the timestamp of the event with the current processing time and drop the record if it is too old.
If the timestamp is encoded in the record, you can do this in FilterFunction or a FlatMapFunction. If the timestamp is attached as event-time timestamp, you can access it in a ProcessFunction.

Best, Fabian

Am Sa., 29. Sep. 2018 um 21:11 Uhr schrieb Ananth Durai <[hidden email]>:

I'm writing a Flink connector to write a stream of events from Kafka to Elastic Search. It is a typical metrics ingestion pipeline, where the latest metrics preferred over the stale data.
What I mean by that, let's assume there was an outage of Elastic Search cluster for about 20 minutes, all the metrics backlogged in Kafka during that period. Once ES is available, the Flink stream will resume from the last offset checkpoint (correctly so) and try to catch up. Instead is their way we can customize flink stream to say if it detects significant backlog, skip over and consume from the latest offset, and schedule a separate backfill task for the backlogged 20 minutes?

Regards,
Ananth.P,