Hello Flinkers,
Is there a means of configuring a windowed stream such that its window function is invoked for empty panes? Looking at the source of WindowOperator in Flink 1.2.0 (line 473), I see that the user's window function is invoked only when the pane isn't empty. I am hoping for a means of overriding this behavior. Alternatively... is there another mean of creating a stream with a periodic output based on each period's inputs and some stored state? Thank you, Ryan |
Hi Ryan,
afaik there is no way to override this behaviour. Did you have a look at the ProcessFunction [1]? Depending on your specific requirements you might be able to re-implement the desired functionality on top of it, but you will have to do all the time-handling yourself and you do not have window-scoped state. Cheers, Konstantin [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html On 18.04.2017 01:49, Ryan Conway wrote: > Hello Flinkers, > > Is there a means of configuring a windowed stream such that its window > function is invoked for empty panes? > > Looking at the source of WindowOperator in Flink 1.2.0 (line 473), I see > that the user's window function is invoked only when the pane isn't > empty. I am hoping for a means of overriding this behavior. > > Alternatively... is there another mean of creating a stream with a > periodic output based on each period's inputs and some stored state? > > Thank you, > Ryan Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc (849 bytes) Download Attachment |
Hi Konstantin, Thank you for your response. That is unfortunate as this is a mission-critical component of our application. I tried process functions first, but ran into problems because their timers can only be controlled when processing an element. A periodic window like this requires the ability to start a timer without an element and to restart a timer when fired. At a high level, we're trying to handle the absence of data while Flink appears to be designed specifically for the presence of data. (I would love to be corrected if I'm wrong.) If anyone has any other ideas I am all ears, and if I'm able to find a solution I'll report it here. Ryan On Mon, Apr 17, 2017 at 9:53 PM, Konstantin Knauf <[hidden email]> wrote: Hi Ryan, |
Hi Ryan,
“A periodic window like this requires the ability to start a timer without an element and to restart a timer when fired.” For the second part, i.e. “to restart a timer when fired”, you can re-register the timer in the onTimer() method (set a new timer for “now + T"), so that the next one fires after T time units, where T is your period. For the first part, where you set the initial timer for a window, this needs to have a first element right? If not, how do you know the key for which to set the timer? Are all the keys known in advance? Kostas
|
Hi Kostas, Re restarting: I missed that ProcessFunction.OnTimerContext extends ProcessFunction.Context! Until now my thought was that OnTimerContext did not provide a means of restarting a timer. Re initial timer, you're right, I'll just need to track a boolean in a state variable that notes whether or not the timer has been initialized. What I am not confident about is how to manage timer recovery after a node failure; I imagine it will make sense to not track this variable. I will do more research and cross that bridge when I get there. So I think a process function will work just fine, here. Thank you again for your time, Kostas and Konstantin. Ryan On Tue, Apr 18, 2017 at 12:07 PM, Kostas Kloudas <[hidden email]> wrote:
|
No problem! Glad I could help!
Kostas
|
I forgot to say that timers are fault-tolerant. You set them, and Flink takes care of checkpointing and
restoring them after failure. The flag will also be fault-tolerant as, i suppose, you will use Flink’s keyed state. For more info, you can check the ProcessFunction documentation that Konstantin provided. There, the example uses a value state to hold the counter, you can do sth similar to keep the flag. Keep in mind that the state will already be scoped by key so you do not have to worry about that either. Kostas
|
Free forum by Nabble | Edit this page |