A newbie question:
I've created a basic Flink DataStream job for an IoT use-case, with file source and sink for testing. I key by device ID, then in a ProcessFunction set an EventTime Timer to fire if a device falls silent, i.e. a timeout, which I cancel if another message arrives from that device within the timeout. My test source generates 3 devices, one of which falls silent for more than the timeout period during the stream, then resumes again. So I expect the Timer to fire for that device during the stream, and then for all the Timers to fire after the end of the stream. The timers do indeed fire at the end of the stream (e.g. with a timeout of 1000, the timers all fire 1000 after the end of the stream, which is correct). But no timer fires for the device which falls silent during the stream (even though other devices are still talking, advancing event time). I've verified that I am keying correctly by ID. I suspect this is something to do with Watermarks. I'm using forBoundedOutOfOrderness watermarking with a duration of 0. All suggestions welcome, thanks. -Pilgrim -- Learn more at https://devicepilot.com @devicepilot +44 7961 125282 See our latest features and book me for a video call. |
Based on your description you aren't doing anything obviously wrong. Would it be possible for you to share the code with us?On 1/27/2021 1:02 PM, Pilgrim Beart
wrote:
|
You were right that it is an issue with
the watermarks; outside of the when the job was stopped they were
never emitted downstream, so no timer was ever triggered.
It appears that you need to set the setAutoWatermarkInterval in the ExecutionConfig via
env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis()); to have them periodically emitted.
Alternatively you could override BoundedOutOfOrdernessWatermarks#onEvent
to also emit a watermark for event (for example, by calling #onPeriodicEmit).
Put another way, if you use any of the
built-in WatermarkGenerators and use event-time, then it appears
that you must set this interval.
This behavior is...less than ideal I
must admit, and it does not appear to be properly documented.
On 1/27/2021 1:56 PM, Chesnay Schepler
wrote:
|
Chesnay, Thanks for this - I've made the change you suggested (setAutoWatermarkInterval) but it hasn't changed the behaviour - timers still get processed only on stream end. I have pushed a new version, with this change, and also emitting some information in a .log field. If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll see the relevant changes. In DPTimeoutFunction you'll see that if I add code to say "cancel the timer only if it wouldn't have gone off" then the output is now correct - individual devices do timeout. However, this output only appears at the end of the stream (i.e. time jumps backwards as all the timers are processed) so I still appear not to be seeing timer processing at the correct event time. If there was no end of stream, I would never get any timeouts. Below is the output I get when I run. This output is correct but: a) only because I am manually cancelling timers in DPTimeoutFunction (search for "!!!") b) the timer events are timestamped correctly, but are not emitted into the stream at the right time - and if the stream didn't end then no timeouts would ever occur (which in particular means that devices that never come back online will never get marked as offline). Perhaps I do need to implement an onPeriodicEmit function? Does that require a customer watermark strategy? I can see how to define a custom watermark at link below, but unclear how to install that? https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "} {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "} {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "} {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelling previous timer. "} {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelling previous timer. "} {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelling previous timer. "} {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelling previous timer. "} {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelling previous timer. "} {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelling previous timer. "} {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelling previous timer. "} {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelling previous timer. "} {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 5000 Cancelling previous timer. "} {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 5000 "} {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelling previous timer. "} {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelling previous timer. "} {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelling previous timer. "} {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 7000 "} {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelling previous timer. "} {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelling previous timer. "} {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelling previous timer. "} {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelling previous timer. "} {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelling previous timer. "} {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelling previous timer. "} {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelling previous timer. "} {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelling previous timer. "} {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelling previous timer. "} {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelling previous timer. "} {"ts":1001,"id":"2","is_online":false} // These are the "going offline" events that we want to see. But they are emitted only once the stream has ended. {"ts":5001,"id":"1","is_online":false} {"ts":11001,"id":"1","is_online":false} {"ts":11001,"id":"0","is_online":false} {"ts":11001,"id":"2","is_online":false} Thanks, -Pilgrim -- Learn more at https://devicepilot.com @devicepilot +44 7961 125282 See our latest features and book me for a video call. On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler <[hidden email]> wrote:
|
In reply to this post by Chesnay Schepler
On 2021/01/27 15:09, Chesnay Schepler wrote:
>Put another way, if you use any of the built-in WatermarkGenerators and >use event-time, then it appears that you *must* set this interval. > >This behavior is...less than ideal I must admit, and it does not >appear to be properly documented. Setting the watermark interval is done when calling `env.setStreamTimeCharacteristic()`. It's the first thing we documented for working with event time [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_time.html To me it was always a usability problem that we didn't have event time enabled by default. We didn't have this because of "performance considerations". This changed in Flink 1.12 [2]. [2] https://issues.apache.org/jira/browse/FLINK-19317 @Pilgrim: Which version of Flink are you using? |
I am calling env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); I am using Flink 11.1 (because I need to run on AWS Kinesis Data Analytics). -Pilgrim -- Learn more at https://devicepilot.com @devicepilot +44 7961 125282 See our latest features and book me for a video call. On Wed, 27 Jan 2021 at 17:07, Aljoscha Krettek <[hidden email]> wrote: On 2021/01/27 15:09, Chesnay Schepler wrote: |
In reply to this post by Pilgrim Beart
My bad, I was still using the custom
WatermarkStrategy that emits a watermark for each event.
.assignTimestampsAndWatermarks( new WatermarkStrategy<T>() { @Override public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new BoundedOutOfOrdernessWatermarks<T>(Duration.ofSeconds(1)) { @Override public void onEvent(Tevent, long eventTimestamp, WatermarkOutput output) { super.onEvent(event, eventTimestamp, output); super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...) @Aljoscha This is about Flink 1.11.
Since the periodic watermarks are dependent on processing time, am
I assuming correctly if the job finishes quickly that watermarks
may never be emitted (except for those at the job)? Is there any
way to emit periodic watermarks based on event time?
Is there any way to enable punctuated
watermarks for the existing watermark strategies without having to
implement a custom one?
On 1/27/2021 5:57 PM, Pilgrim Beart
wrote:
|
Note that while this does fix the issue
of timers not firing while the job is running, it seems to be
firing too many timers...
On 1/27/2021 6:49 PM, Chesnay Schepler
wrote:
|
Actually, if the parallelism is 1 then
it works as it should. sigh....
On 1/27/2021 6:52 PM, Chesnay Schepler
wrote:
|
Chesnay, I cannot reproduce this - I've tried the approaches you suggest, but nothing I've done makes the timers fire at the correct time in the stream - they only fire when the stream has ended. If you have an EventTime example where they fire at the right time in the stream, I'd love to see it. Or any ideas for other things to try? Could it perhaps be related to using a file source? Thanks, -Pilgrim -- Learn more at https://devicepilot.com @devicepilot +44 7961 125282 See our latest features and book me for a video call. On Wed, 27 Jan 2021 at 17:55, Chesnay Schepler <[hidden email]> wrote:
|
Scratch that - your WatermarkStrategy DOES work (when I implement it correctly!). Well, almost: As you can see below (code pushed to repo), the Timer events are still appearing somewhat late in the stream - 4 events late in this case. It may be just good-enough for my purposes, though it will make building test cases painful, so if you have any ideas how I could fix that, would be much appreciated. {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "} {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "} {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "} {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"} {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelled previous timer. "} {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelled previous timer. "} {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 5000 Cancelled previous timer. "} {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 5000 Cancelled previous timer. "} {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelled previous timer. "} {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelled previous timer. "} {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelled previous timer. "} {"ts":5000,"id":"1","is_online":false,"log":"timestamp is 5000"} {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 7000 Cancelled previous timer. "} {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelled previous timer. "} {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelled previous timer. "} {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelled previous timer. "} {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "} {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "} {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelled previous timer. "} {"ts":11000,"id":"1","is_online":false,"log":"timestamp is 11000"} {"ts":11000,"id":"2","is_online":false,"log":"timestamp is 11000"} {"ts":11000,"id":"0","is_online":false,"log":"timestamp is 11000"} -Pilgrim -- Learn more at https://devicepilot.com @devicepilot +44 7961 125282 See our latest features and book me for a video call. On Thu, 28 Jan 2021 at 08:37, Pilgrim Beart <[hidden email]> wrote:
|
I'm not sure I see the problem in your output.
For any given key the timestamps are in order, and the events where devices are offline seem to occur at the right time. Is it just that you'd like the following line to occur earlier in the output? {"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"} If so, then I'd just partition the output by key and evaluate
them individually. On 1/28/2021 9:53 AM, Pilgrim Beart
wrote:
|
Chesnay, 1) Correct, I'd like the timeout event (generated at eventTime==1000) to appear in its correct time sequence in the output, i.e. before eventTime exceeds 1000.
It's great that Flink can deal with out-of-orderness, but I didn't expect it to spontaneously create it (especially with parallelism==1). In the previous case the timeout is emitted late by 2 seconds (4 events). So I was wondering - how late could it be? I dialled down the Duration of the WatermarkGenerator BoundedOutOfOrderness to 0, and the timeout now only appears slightly late, as log output below. By inserting extra timestamps, I've demonstrated that this is "1 event" late, rather than "1 second" late. It's as if the watermark generator realises that time is advancing, so it triggers the timeout, but only after emitting the event that advanced time? At least this is feeling deterministic. Although, relying on the presence of that "forcing" event seems non-ideal - if there just happens not to be one, due to a gap in other ID streams, we'll get unbounded latency in our timeouts, which means we can't offer any downstream systems any out-of-orderness guarantee. 2) Apart from that unbounded latency concern, it's a fair point that if I'm going to partition the output by ID anyway, this isn't a huge problem. 3) Is there any negative effect of setting the BoundedOutofOrderness duration to 0? Does it somehow make Flink less efficient? 4) In a subsequent stage, we want to do time-window aggregation (but only within, not across, IDs). Setting the watermark duration to 0 will make the window emit immediately. Bu we want data that arrives less than 2 minutes late not to be considered late, i.e. don't emit any window until the latest event time is at least 2 minutes after the window end time. Is it possible to set watermark strategies separately per processing stage? Thanks again for all your very helpful responses, {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "} {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "} {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "} {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"} // <----- Arrives one event too late {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelled previous timer. "} -Pilgrim -- Learn more at https://devicepilot.com @devicepilot +44 7961 125282 See our latest features and book me for a video call. On Thu, 28 Jan 2021 at 12:34, Chesnay Schepler <[hidden email]> wrote:
|
1) Outside of small-scale tests the
periodic emission of watermarks should ensure that the latency is
not unbounded. Your test just runs so quickly that this never
triggers.
As for the triggering element being
emitted first: The reason this happen is because watermarks are
not really attached to elements.
The watermark generator accepts an
element, determines the watermark, outputs the element, and then
outputs the watermark.
IOW, watermarks are trailing "normal"
elements in the stream.
Hence the order of operations is:
element arrives at your ProcessFunction, is emitted. Watermark
arrives at your PF, advances time, timers are fired.
3) The OutOfOrderness parameter is a
trade-off between memory usage, latency and tolerance in regards
to your input source.
The higher the OutOfOrderness the
longer Flink has to keep Window state around (because another
element could still arrive), and consequently more Windows can be
active.
As such, the lower this value is the
less resources Flink will consume.
You want this value to be low as
possible while still being able to handle as much of the
out-of-order data as required your use-case.
If you can guarantee that all input
elements are sorted by their timestamp then setting it to 0 is ok.
4) You can see the allowed lateness
individually for each window operation.
On 1/28/2021 7:52 PM, Pilgrim Beart
wrote:
|
Free forum by Nabble | Edit this page |