Window with recent messages

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Window with recent messages

Krzysztof Białek
Hi,

My app is calculating Companies scores from Ratings given by users.
Only ratings from last 90 days should be considered.

1. Is it possible to construct window processing ratings from last 90 days?
I've started with *misusing* countWindow but this solution looks ugly for me.

ratingStream
.filter(new OutdatedRatingsFilter(maxRatingAge))
.keyBy(_.companyId)
.countWindow(0L).trigger(new OnEventTrigger).evictor(new OutdatedRatingsEvictor(maxRatingAge))
.process(ratingFunction)

2. How to recalculate score once the rating expires (after 90 days)?
I don't want to put artificial ratings into the stream to trigger the recalculation.

Any idea how can I do it better?

Regards,
Krzysztof


Reply | Threaded
Open this post in threaded view
|

Re: Window with recent messages

Fabian Hueske-2
Hi Krzysztof,

you could compute the stats in two stages:

1) compute an daily window. You should use a ReduceFunction or AggreagteFunction here if possible to perform the computation eagerly.
2) compute a sliding window of 90 days with a 1 day hop (or 90 rows with a 1 row hop).

That will crunch down the data in the first window and the second window will combine the pre-aggregated results.

Hope this helps,
Fabian

2018-02-19 16:36 GMT+01:00 Krzysztof Białek <[hidden email]>:
Hi,

My app is calculating Companies scores from Ratings given by users.
Only ratings from last 90 days should be considered.

1. Is it possible to construct window processing ratings from last 90 days?
I've started with *misusing* countWindow but this solution looks ugly for me.

ratingStream
.filter(new OutdatedRatingsFilter(maxRatingAge))
.keyBy(_.companyId)
.countWindow(0L).trigger(new OnEventTrigger).evictor(new OutdatedRatingsEvictor(maxRatingAge))
.process(ratingFunction)

2. How to recalculate score once the rating expires (after 90 days)?
I don't want to put artificial ratings into the stream to trigger the recalculation.

Any idea how can I do it better?

Regards,
Krzysztof



Reply | Threaded
Open this post in threaded view
|

Re: Window with recent messages

Krzysztof Białek
Hi Fabian,

Thank you for your suggestion. In the meantime I rethought this problem and implemented alternative solution without using windows at all.
I used plain ProcessFunction with
1. Keyed state (by companyId) - to keep ratings per key
2. EventTime timers - to remove outdated ratings from state and emit recalculated score immediately

This solution gives results in real-time, windows would delay the results by 1 day I think.

Regards,
Krzysztof


On Thu, Feb 22, 2018 at 9:44 AM, Fabian Hueske <[hidden email]> wrote:
Hi Krzysztof,

you could compute the stats in two stages:

1) compute an daily window. You should use a ReduceFunction or AggreagteFunction here if possible to perform the computation eagerly.
2) compute a sliding window of 90 days with a 1 day hop (or 90 rows with a 1 row hop).

That will crunch down the data in the first window and the second window will combine the pre-aggregated results.

Hope this helps,
Fabian

2018-02-19 16:36 GMT+01:00 Krzysztof Białek <[hidden email]>:
Hi,

My app is calculating Companies scores from Ratings given by users.
Only ratings from last 90 days should be considered.

1. Is it possible to construct window processing ratings from last 90 days?
I've started with *misusing* countWindow but this solution looks ugly for me.

ratingStream
.filter(new OutdatedRatingsFilter(maxRatingAge))
.keyBy(_.companyId)
.countWindow(0L).trigger(new OnEventTrigger).evictor(new OutdatedRatingsEvictor(maxRatingAge))
.process(ratingFunction)

2. How to recalculate score once the rating expires (after 90 days)?
I don't want to put artificial ratings into the stream to trigger the recalculation.

Any idea how can I do it better?

Regards,
Krzysztof




Reply | Threaded
Open this post in threaded view
|

Re: Window with recent messages

Fabian Hueske-2
Hi Krzysztof,

Thanks for sharing your solution!
ProcessFunctions are the Swiss army knife of Flink :-)

Cheers, Fabian

2018-02-22 19:55 GMT+01:00 Krzysztof Białek <[hidden email]>:
Hi Fabian,

Thank you for your suggestion. In the meantime I rethought this problem and implemented alternative solution without using windows at all.
I used plain ProcessFunction with
1. Keyed state (by companyId) - to keep ratings per key
2. EventTime timers - to remove outdated ratings from state and emit recalculated score immediately

This solution gives results in real-time, windows would delay the results by 1 day I think.

Regards,
Krzysztof


On Thu, Feb 22, 2018 at 9:44 AM, Fabian Hueske <[hidden email]> wrote:
Hi Krzysztof,

you could compute the stats in two stages:

1) compute an daily window. You should use a ReduceFunction or AggreagteFunction here if possible to perform the computation eagerly.
2) compute a sliding window of 90 days with a 1 day hop (or 90 rows with a 1 row hop).

That will crunch down the data in the first window and the second window will combine the pre-aggregated results.

Hope this helps,
Fabian

2018-02-19 16:36 GMT+01:00 Krzysztof Białek <[hidden email]>:
Hi,

My app is calculating Companies scores from Ratings given by users.
Only ratings from last 90 days should be considered.

1. Is it possible to construct window processing ratings from last 90 days?
I've started with *misusing* countWindow but this solution looks ugly for me.

ratingStream
.filter(new OutdatedRatingsFilter(maxRatingAge))
.keyBy(_.companyId)
.countWindow(0L).trigger(new OnEventTrigger).evictor(new OutdatedRatingsEvictor(maxRatingAge))
.process(ratingFunction)

2. How to recalculate score once the rating expires (after 90 days)?
I don't want to put artificial ratings into the stream to trigger the recalculation.

Any idea how can I do it better?

Regards,
Krzysztof





Reply | Threaded
Open this post in threaded view
|

Re: Window with recent messages

m@xi
In reply to this post by Krzysztof Białek
Hi Krzystzof,

I want to do something which is very similar if not identical to yours.
Apply sliding windows in my input streams by using "the swiss army knife" of
Flink. If it is easy and not a problem for you, it would be great if
you uploaded here the skeleton code of your solution.

Thanks in advance.

Best,
Max




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window with recent messages

Krzysztof Białek
Hi m@xi,

There you have it (as concept, not run, not tested)

Note that your stream must contain watermarks. Otherwise EventTime timers won't be triggered.

Regards,
Krzysztof

On Sat, Feb 24, 2018 at 10:00 AM, m@xi <[hidden email]> wrote:
Hi Krzystzof,

I want to do something which is very similar if not identical to yours.
Apply sliding windows in my input streams by using "the swiss army knife" of
Flink. If it is easy and not a problem for you, it would be great if
you uploaded here the skeleton code of your solution.

Thanks in advance.

Best,
Max




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Window with recent messages

m@xi
Hello there Krzystzof!

Thanks a lot for the answer. Sorry for the late reply. I can see the logic
behind custom window processing in Flink.

Once, an incoming tuple arrives, you add a timer to it, which is going to
tick after "RatingExpiration" time units, as shown in your code. This, is
made *for each tuple*.

I have the following questions :

1 -- In my case, I do not have timestamps a-priori so I must append a
timestamp to the tuples as they arrive at the sources. Here [1] shows how to
assign timestamps to my data. Is this the correct way to do it? Also, what
type of watermarks is it better to assign? And what notion of time is it
more reasonable to use {EventTime, ProcessingTime, IngestionTime}?

2 -- The range of the sliding window equals to "RatingExpiration" if I am
correct. But, where is the slide of the sliding window defined? I guess the
slide has to do with the query, meaning each *s* time units evaluate the
query with the data residing in the range *r* last time units.

3 -- If I get to assign correctly the timestamps from above then it is
trivial, based also on your skeleton code, to simulate *time-based* sliding
windows. What about the case of *count-based* sliding windows???

Thanks a lot in advance.

Best,
Max

[1] --
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/