Hello, flink community,
I am using period watermark and extract the event time from each records from files in S3. I am using the `TimeLagWatermarkGenerator` as it was mentioned in flink documentation. Currently, a new watermark will be generated using processing time by fixed amount override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis() - maxTimeLag) } This would work fine as long as process is running. However, in case of failures, I mean if there was some bad data or out of memory occurs, I need to stop the process and it will take me time to get back. If the maxTimeLag=3 hours, and it took me 12 hours to realize and fix it. My question is since I am using processing time as part of the watermark, when flink resumed from failures, will some records might be ignored by the watermark? And what's the best practice to catchup and continue without losing data? Thanks! Best, Chengzhi |
Hi Chengzhi,
if you emit a watermark even though there is still data with a lower timestamp, you generate "late data" that either needs to be processed in a separate branch of your pipeline (see sideOutputLateData() [1]) or should force your existing operators to update their previously emitted results. The latter means holding state or the contents of your windows longer (see allowedLateness() [1]). I think in general a processing time watermark strategy might not be suitable for reprocessing. Either you parameterize your watermark generator such that you can pass information through job parameters or you use another strategy such as BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow idempotent updates. I hope this helps. Regards, Timo [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#windows [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamp_extractors.html Am 02.04.18 um 23:51 schrieb Chengzhi Zhao:
|
Thanks Timo for your response and the references. I will try BoundedOutOfOrdernessTimestamp Also, is there a way to retrieve the last watermark before/after failure? So maybe I can persist the watermark to external storage and resume as a separated pipeline? Best, Chengzhi On Tue, Apr 3, 2018 at 7:58 AM, Timo Walther <[hidden email]> wrote:
|
Hi Chengzhi, You can access the current watermark from the Context object of a ProcessFunction [1] and store it in operator state [2].2018-04-03 19:39 GMT+02:00 Chengzhi Zhao <[hidden email]>:
|
Thanks Fabian! This is very helpful! Best, Chengzhi On Wed, Apr 4, 2018 at 9:02 AM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |