Always trigger calculation of a tumble window in Flink SQL

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Always trigger calculation of a tumble window in Flink SQL

yinhua.dai
We have a requirement that always want to trigger a calculation on a timer
basis e.g. every 1 minute.

*If there are records come in flink during the time window then calculate it
with the normal way, i.e. aggregate for each record and getResult() at end
of the time window.*

*If there are no records come in flink during the time window, then send the
last calculated result.*

I know that Flink will not trigger the calculation in the second case(when
no records come in the system during the time window), if there a solution
for me in Flink SQL?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Always trigger calculation of a tumble window in Flink SQL

Piotr Nowojski
Hi,

You might come up with some magical self join that could do the trick - join/window join the the aggregation result with self and then aggregate it again. I don’t know if that’s possible (probably you would need to write custom aggregate function) and would be inefficient. It will be easier to convert result of your SQL query into a DataStream and process it with a simple/custom DataStream operator.

Piotrek

> On 5 Nov 2018, at 10:17, yinhua.dai <[hidden email]> wrote:
>
> We have a requirement that always want to trigger a calculation on a timer
> basis e.g. every 1 minute.
>
> *If there are records come in flink during the time window then calculate it
> with the normal way, i.e. aggregate for each record and getResult() at end
> of the time window.*
>
> *If there are no records come in flink during the time window, then send the
> last calculated result.*
>
> I know that Flink will not trigger the calculation in the second case(when
> no records come in the system during the time window), if there a solution
> for me in Flink SQL?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Always trigger calculation of a tumble window in Flink SQL

yinhua.dai
Hi Piotr,

Can you elaborate more on the solution with the custom operator?
I don't think there will be any records from the SQL query if no input data
in coming in within the time window even if we convert the result to a
datastream.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Always trigger calculation of a tumble window in Flink SQL

Piotr Nowojski
Hi,

You would have to register timers (probably based on event time).

Your operator would be a vastly simplified window operator, where for given window you keep emitted record from your SQL, sth like:

MapState<Timestamp, Record> emittedRecords; // map window start -> emitted record

When you process elements, you just put them into this map. To emit the results, you just register event time timers and when a timer fires, you search in the map for the latest record matching the timer's event time (there might be many elements in the map, some of them older some of them newer then the fired timer). You can/should also prune the state in the same timer - for example after emitting the result drop all of the windows older then the timer.

Piotrek

> On 7 Nov 2018, at 02:55, yinhua.dai <[hidden email]> wrote:
>
> Hi Piotr,
>
> Can you elaborate more on the solution with the custom operator?
> I don't think there will be any records from the SQL query if no input data
> in coming in within the time window even if we convert the result to a
> datastream.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Always trigger calculation of a tumble window in Flink SQL

yinhua.dai
Hi Piotr,

Thank you for your explanation.
I basically understand your meaning, as far as my understanding, we can
write a custom window assigner and custom trigger, and we can register the
timer when the window process elements.

But How can we register a timer when no elements received during a time
window?
My requirement is to always fire at end of the time window even no result
from the sql query.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Always trigger calculation of a tumble window in Flink SQL

Piotr Nowojski
In reply to this post by Piotr Nowojski
Re-adding user mailing list to CC

Hi,

> I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements.


No I was actually suggesting to write your own operator to do that. My bet is that hacking window operator to make it re-emit the same result in case of no data would be more difficult if not even impossible, while your custom “ReEmitLastRow” operator should be relatively simple.

> But How can we register a timer when no elements received during a time window?

Upon first element register timer for N seconds in the future. Once it fires, register next one (you can do that while processing a timer callback) again for N seconds in the future and so on.

Piotrek

> On 8 Nov 2018, at 07:44, [hidden email] wrote:
>
> Hi Piotr,
>
> Thank you for your explanation.
> I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements.
>
> But How can we register a timer when no elements received during a time window?
> My requirement is to always fire at end of the time window even no result from the sql query.



> On 7 Nov 2018, at 09:48, Piotr Nowojski <[hidden email]> wrote:
>
> Hi,
>
> You would have to register timers (probably based on event time).
>
> Your operator would be a vastly simplified window operator, where for given window you keep emitted record from your SQL, sth like:
>
> MapState<Timestamp, Record> emittedRecords; // map window start -> emitted record
>
> When you process elements, you just put them into this map. To emit the results, you just register event time timers and when a timer fires, you search in the map for the latest record matching the timer's event time (there might be many elements in the map, some of them older some of them newer then the fired timer). You can/should also prune the state in the same timer - for example after emitting the result drop all of the windows older then the timer.
>
> Piotrek
>
>> On 7 Nov 2018, at 02:55, yinhua.dai <[hidden email]> wrote:
>>
>> Hi Piotr,
>>
>> Can you elaborate more on the solution with the custom operator?
>> I don't think there will be any records from the SQL query if no input data
>> in coming in within the time window even if we convert the result to a
>> datastream.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Always trigger calculation of a tumble window in Flink SQL

yinhua.dai
I am able to write a single operator as you suggested, thank you.

And then I saw ContinuousProcessingTimeTrigger from flink source code, it
looks like it's something that I am looking for, if there is a way that I
can customize the SQL "TUMBLE" window to use this trigger instead of
ProcessingTimeTrigger, then it should solve my problem.

Do you know if there is a way to use a customize trigger in the "TUMBLE"
window of SQL?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Always trigger calculation of a tumble window in Flink SQL

Fabian Hueske-2
Hi,

SQL does not support any custom triggers or timers.
In general, computations are performed when they are complete with respect to the watermarks (applies for GROUP BY windows, OVER windows, windowed and time-versioned joins, etc.

Best, Fabian

Am Fr., 9. Nov. 2018 um 05:08 Uhr schrieb yinhua.dai <[hidden email]>:
I am able to write a single operator as you suggested, thank you.

And then I saw ContinuousProcessingTimeTrigger from flink source code, it
looks like it's something that I am looking for, if there is a way that I
can customize the SQL "TUMBLE" window to use this trigger instead of
ProcessingTimeTrigger, then it should solve my problem.

Do you know if there is a way to use a customize trigger in the "TUMBLE"
window of SQL?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/