I found that flink SQL use the specific default trigger, which will not
triggered until the window closes. But sometimes, we need to trigger before window closes. As the class *WindowAssigner *provides method *getDefaultTrigger *with parameter *StreamExecutionEnvironment*, how about passing a custom trigger to *WindowAssigner *by *StreamExecutionEnvironment *? If we can do this, SQL can support custom triggers. It is easy to implement. All we need to do is just add new variables named like *defaultTimeWindowTrigger *and *defaultGlobalWindowTrigger *to*StreamExecutionEnvironment*, which can be set by public setter method. Then *WindowAssigner *could get the *defaultWindowTrigger *or *defaultGlobalWindowTrigger *from *StreamExecutionEnvironment *by *getDefaultTrigger *method. Codes : *StreamExecutionEnvironment*: /** The default trigger used for creating a time window */ private Trigger<Object, TimeWindow> defaultTimeWindowTrigger; /** The default trigger used for creating a global window */ private Trigger<Object, GlobalWindow> defaultGlobalWindowTrigger; /** * Get default trigger of time window * @return */ public Trigger<Object, TimeWindow> getDefaultTimeWindowTrigger() { return defaultTimeWindowTrigger; } /** * Set default trigger of time window * @param defaultTimeWindowTrigger */ public void setDefaultTimeWindowTrigger(Trigger<Object, TimeWindow> defaultTimeWindowTrigger) { this.defaultTimeWindowTrigger = defaultTimeWindowTrigger; } /** * Get default trigger of global window * @return */ public Trigger<Object, GlobalWindow> getDefaultGlobalWindowTrigger() { return defaultGlobalWindowTrigger; } /** * Set default trigger of global window * @param defaultGlobalWindowTrigger */ public void setDefaultGlobalWindowTrigger(Trigger<Object, GlobalWindow> defaultGlobalWindowTrigger) { this.defaultGlobalWindowTrigger = defaultGlobalWindowTrigger; } *TumblingEventTimeWindows/ TumblingProcessingTimeWindows/…* @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { // Get default trigger from StreamExecutionEnvironment Trigger<Object, TimeWindow> defaultTrigger = env.getDefaultTimeWindowTrigger(); if (defaultTrigger != null) { return defaultTrigger; } return EventTimeTrigger.create(); } *GlobalWindows*: @Override public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { // Get default trigger from StreamExecutionEnvironment Trigger<Object, GlobalWindow> defaultTrigger = env.getDefaultGlobalWindowTrigger(); if (defaultTrigger != null) { return defaultTrigger; } return new NeverTrigger(); } *Look forward to your comments. I would really appreciate taking the time to help me think about this.* -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Although this solution looks straight-forward, custom triggers cannot be added that easily. The problem is that a window operator with a Trigger that emit early results produces updates, i.e., results that have been emitted might be updated later. The default Trigger only emits the final result and hence does not produce updates. This is an important difference, because all following operators need to be aware of the updates and be able to process them to prevent incorrect results. Therefore, the query planner needs to be aware of the semantics of the Trigger. This would not be the case if it would be set via the StreamExecutionEnvironment. There is a proposal to add an EMIT clause to SQL queries to control the rate at which results are emitted [1] that might be interesting. Best, Fabian 2018-06-22 4:48 GMT+02:00 YennieChen88 <[hidden email]>: I found that flink SQL use the specific default trigger, which will not |
Free forum by Nabble | Edit this page |