Finding things not seen in the last window

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Finding things not seen in the last window

Ron Crocker
Hi -

I have a colleague who is trying to write a flink job that will determine deltas from period to period. Let’s say the periods are 1 minutes. What he would like to do is report in minute 2 those things that are new since from minute 1, then in minute 3 report those things that are new also since minute 1.

For example, consider the stream looks like
minute | name
=======|=======
     1 | abc
     1 | def
     2 | abc
     2 | ghi
     3 | abc
     3 | def
     4 | ghi
     4 | jkl

What we would like to report is:
minute | count | names
=======|=======|=======
     1 |     2 | abc, def
     2 |     1 | ghi
     3 |     0 |
     4 |     1 | jkl

In minute 2, abc was already seen but ghi is new, so it gets reported out as new. In minute 3, abc and def havalready been seen, so there are no new names, and again in minute 4 ghi has been seen but jkl is new, so we report out the 1 new name.

I’m struggling to help and thought someone here might be able to help. I have thought about merging two streams (the stream of new things and the stream of the full set seen so far) but haven’t tried that yet. 

I welcome any of your inputs.

Thanks!

Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: +1 630 363 8835

Reply | Threaded
Open this post in threaded view
|

Re: Finding things not seen in the last window

Haohui Mai
Hi,

Assuming FLINK-6465 lands, will something like 

SELECT COUNT(*) FROM (SELECT FIRST_VALUE(names) FROM stream) GROUP BY HOP(proctime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)

works?

~Haohui

On Fri, Sep 29, 2017 at 6:52 PM Ron Crocker <[hidden email]> wrote:
Hi -

I have a colleague who is trying to write a flink job that will determine deltas from period to period. Let’s say the periods are 1 minutes. What he would like to do is report in minute 2 those things that are new since from minute 1, then in minute 3 report those things that are new also since minute 1.

For example, consider the stream looks like
minute | name
=======|=======
     1 | abc
     1 | def
     2 | abc
     2 | ghi
     3 | abc
     3 | def
     4 | ghi
     4 | jkl

What we would like to report is:
minute | count | names
=======|=======|=======
     1 |     2 | abc, def
     2 |     1 | ghi
     3 |     0 |
     4 |     1 | jkl

In minute 2, abc was already seen but ghi is new, so it gets reported out as new. In minute 3, abc and def havalready been seen, so there are no new names, and again in minute 4 ghi has been seen but jkl is new, so we report out the 1 new name.

I’m struggling to help and thought someone here might be able to help. I have thought about merging two streams (the stream of new things and the stream of the full set seen so far) but haven’t tried that yet. 

I welcome any of your inputs.

Thanks!

Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: <a href="tel:(630)%20363-8835" value="+16303638835" target="_blank">+1 630 363 8835
Reply | Threaded
Open this post in threaded view
|

Re: Finding things not seen in the last window

Aljoscha Krettek
Hi Ron,

I think your colleague might be able to do that using a ProcessFunction with MapState and timers. The MapState is used to determine if a record is new. Timers would be used to schedule emission and also to schedule cleanup of of entries from the  MapState. For doing cleanup, the entries in the MapState could have a timestamp that you check when a cleanup timer fires.

Best,
Aljoscha


On 30. Sep 2017, at 19:22, Haohui Mai <[hidden email]> wrote:

Hi,

Assuming FLINK-6465 lands, will something like 

SELECT COUNT(*) FROM (SELECT FIRST_VALUE(names) FROM stream) GROUP BY HOP(proctime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)

works?

~Haohui

On Fri, Sep 29, 2017 at 6:52 PM Ron Crocker <[hidden email]> wrote:
Hi -

I have a colleague who is trying to write a flink job that will determine deltas from period to period. Let’s say the periods are 1 minutes. What he would like to do is report in minute 2 those things that are new since from minute 1, then in minute 3 report those things that are new also since minute 1.

For example, consider the stream looks like
minute | name
=======|=======
     1 | abc
     1 | def
     2 | abc
     2 | ghi
     3 | abc
     3 | def
     4 | ghi
     4 | jkl

What we would like to report is:
minute | count | names
=======|=======|=======
     1 |     2 | abc, def
     2 |     1 | ghi
     3 |     0 |
     4 |     1 | jkl

In minute 2, abc was already seen but ghi is new, so it gets reported out as new. In minute 3, abc and def havalready been seen, so there are no new names, and again in minute 4 ghi has been seen but jkl is new, so we report out the 1 new name.

I’m struggling to help and thought someone here might be able to help. I have thought about merging two streams (the stream of new things and the stream of the full set seen so far) but haven’t tried that yet. 

I welcome any of your inputs.

Thanks!

Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: <a href="tel:(630)%20363-8835" value="+16303638835" target="_blank" class="">+1 630 363 8835