Working with the Windowing functionality

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Working with the Windowing functionality

Niels Basjes
Hi,

I'm trying to build something in Flink that relies heavily on the Windowing features.

In essence what I want to build:
I have clickstream data coming in via Kafka. Each record (click) has a sessionid and a timestamp.
I want to create a window for each session and after 30 minutes idle I want all events for that session (visit) to be written to disk.
This should result in the effect that a specific visit exists in exactly one file.
Since HDFS does not like 'small files' I want to create a (set of) files every 15 minutes that contains several complete  visits.
So I need to buffer the 'completed visits' and flush them to disk in 15 minute batches.

What I think I need to get this is:
1) A map function that assigns the visit-id (i.e. new id after 30 minutes idle)
2) A window per visit-id (close the window 30 minutes after the last click) 
3) A window per 15 minutes that only contains windows of visits that are complete 

Today I've been trying to get this setup and I think I have some parts that are in the right direction.

I have some questions and I'm hoping you guys can help me:

1) I have trouble understanding the way a windowed stream works "exactly". 
As a consequence I'm having a hard time verifying if my code does what I understand it should do. 
I guess what would really help me is a very simple example on how to unittest such a window.

2) Is what I describe above perhaps already been done before? If so; any pointers are really appreciated.

3) Am I working in the right direction for what I'm trying to achieve; or should I use a different API? a different approach?

Thanks

--
Best regards / Met vriendelijke groeten,

Niels Basjes


Reply | Threaded
Open this post in threaded view
|

Re: Working with the Windowing functionality

Aljoscha Krettek
Hi Niels,
do the records that arrive from Kafka already have the session ID or do you want to assign them inside your Flink job based on the idle timeout?

For the rest of your problems you should be able to get by with what Flink provides:

The triggering can be done using a custom Trigger that fires after we haven’t seen an element for 30 minutes.
public class TimeoutTrigger implements Trigger<Object, Window> {
   private static final long serialVersionUID = 1L;

   @Override
   public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
      // on every element it will set a timer for 30 seconds in the future
      // a trigger can only have 1 timer so we remove the old trigger when setting the new one
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 30000); // this is 30 seconds but you can change it
      return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) {
      return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
      return TriggerResult.FIRE_AND_PURGE;
   }

   @Override
   public String toString() {
      return "TimeoutTrigger()";
   }
}

you would use it like this:
stream.keyBy(…).window(…).trigger(new TimeoutTrigger())

For writing to files you could use the RollingSink (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem). I think this does pretty much what you want. You can specify how large the files that it writes are, and it can also roll to new files on a specified time interval.

Please let us know if you need more information.

Cheers,
Aljoscha

> On 26 Nov 2015, at 22:13, Niels Basjes <[hidden email]> wrote:
>
> Hi,
>
> I'm trying to build something in Flink that relies heavily on the Windowing features.
>
> In essence what I want to build:
> I have clickstream data coming in via Kafka. Each record (click) has a sessionid and a timestamp.
> I want to create a window for each session and after 30 minutes idle I want all events for that session (visit) to be written to disk.
> This should result in the effect that a specific visit exists in exactly one file.
> Since HDFS does not like 'small files' I want to create a (set of) files every 15 minutes that contains several complete  visits.
> So I need to buffer the 'completed visits' and flush them to disk in 15 minute batches.
>
> What I think I need to get this is:
> 1) A map function that assigns the visit-id (i.e. new id after 30 minutes idle)
> 2) A window per visit-id (close the window 30 minutes after the last click)
> 3) A window per 15 minutes that only contains windows of visits that are complete
>
> Today I've been trying to get this setup and I think I have some parts that are in the right direction.
>
> I have some questions and I'm hoping you guys can help me:
>
> 1) I have trouble understanding the way a windowed stream works "exactly".
> As a consequence I'm having a hard time verifying if my code does what I understand it should do.
> I guess what would really help me is a very simple example on how to unittest such a window.
>
> 2) Is what I describe above perhaps already been done before? If so; any pointers are really appreciated.
>
> 3) Am I working in the right direction for what I'm trying to achieve; or should I use a different API? a different approach?
>
> Thanks
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Working with the Windowing functionality

Niels Basjes
Hi,

Thanks for all this input.
I didn't know about the 
      // a trigger can only have 1 timer so we remove the old trigger when setting the new one

This insight is to me of major importance.
Let me explain: 
I found in the WindowOperator this code below.

@Override
public void registerEventTimeTimer(long time) {
if (watermarkTimer == time) {
// we already have set a trigger for that time
return;
}
Set<Context> triggers = watermarkTimers.get(time);
if (triggers == null) {
triggers = new HashSet<>();
watermarkTimers.put(time, triggers);
}
this.watermarkTimer = time;
triggers.add(this);
}

and

if (time == watermarkTimer) {
watermarkTimer = -1;
Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);

Effectively the new value is stored; processed yet at the moment the trigger fires the call is not forwarded into the application. 
So if I would do it as you show in your example I would have the same number of trigger entries in the watermarkTimers set as I have seen events.
My application will (in total) handle about 50K events/sec resulting in to thousands 'onEventTime' calls per second.

So thank you. I now understand I have to be more careful with these timers!.

Niels Basjes



On Fri, Nov 27, 2015 at 11:28 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi Niels,
do the records that arrive from Kafka already have the session ID or do you want to assign them inside your Flink job based on the idle timeout?

For the rest of your problems you should be able to get by with what Flink provides:

The triggering can be done using a custom Trigger that fires after we haven’t seen an element for 30 minutes.
public class TimeoutTrigger implements Trigger<Object, Window> {
   private static final long serialVersionUID = 1L;

   @Override
   public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
      // on every element it will set a timer for 30 seconds in the future
      // a trigger can only have 1 timer so we remove the old trigger when setting the new one
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 30000); // this is 30 seconds but you can change it
      return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) {
      return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
      return TriggerResult.FIRE_AND_PURGE;
   }

   @Override
   public String toString() {
      return "TimeoutTrigger()";
   }
}

you would use it like this:
stream.keyBy(…).window(…).trigger(new TimeoutTrigger())

For writing to files you could use the RollingSink (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem). I think this does pretty much what you want. You can specify how large the files that it writes are, and it can also roll to new files on a specified time interval.

Please let us know if you need more information.

Cheers,
Aljoscha
> On 26 Nov 2015, at 22:13, Niels Basjes <[hidden email]> wrote:
>
> Hi,
>
> I'm trying to build something in Flink that relies heavily on the Windowing features.
>
> In essence what I want to build:
> I have clickstream data coming in via Kafka. Each record (click) has a sessionid and a timestamp.
> I want to create a window for each session and after 30 minutes idle I want all events for that session (visit) to be written to disk.
> This should result in the effect that a specific visit exists in exactly one file.
> Since HDFS does not like 'small files' I want to create a (set of) files every 15 minutes that contains several complete  visits.
> So I need to buffer the 'completed visits' and flush them to disk in 15 minute batches.
>
> What I think I need to get this is:
> 1) A map function that assigns the visit-id (i.e. new id after 30 minutes idle)
> 2) A window per visit-id (close the window 30 minutes after the last click)
> 3) A window per 15 minutes that only contains windows of visits that are complete
>
> Today I've been trying to get this setup and I think I have some parts that are in the right direction.
>
> I have some questions and I'm hoping you guys can help me:
>
> 1) I have trouble understanding the way a windowed stream works "exactly".
> As a consequence I'm having a hard time verifying if my code does what I understand it should do.
> I guess what would really help me is a very simple example on how to unittest such a window.
>
> 2) Is what I describe above perhaps already been done before? If so; any pointers are really appreciated.
>
> 3) Am I working in the right direction for what I'm trying to achieve; or should I use a different API? a different approach?
>
> Thanks
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>




--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Working with the Windowing functionality

Aljoscha Krettek-2
Hi,
yes, you are right in your analysis. Did you try running it with always setting the timer? Maybe it’s not the bottleneck of the computation. I would be very interested in seeing how this behaves since I only did tests with regular time windows, where the first if statement almost always directly returns, which is very cheap.

Cheers,
Aljoscha

> On 27 Nov 2015, at 13:59, Niels Basjes <[hidden email]> wrote:
>
> Hi,
>
> Thanks for all this input.
> I didn't know about the
>       // a trigger can only have 1 timer so we remove the old trigger when setting the new one
>
> This insight is to me of major importance.
> Let me explain:
> I found in the WindowOperator this code below.
>
> @Override
> public void registerEventTimeTimer(long time) {
>    if (watermarkTimer == time) {
>       // we already have set a trigger for that time
>       return;
>    }
>    Set<Context> triggers = watermarkTimers.get(time);
>    if (triggers == null) {
>       triggers = new HashSet<>();
>       watermarkTimers.put(time, triggers);
>    }
>    this.watermarkTimer = time;
>    triggers.add(this);
> }
>
> and
>
> if (time == watermarkTimer) {
>    watermarkTimer = -1;
>    Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
>
> Effectively the new value is stored; processed yet at the moment the trigger fires the call is not forwarded into the application.
> So if I would do it as you show in your example I would have the same number of trigger entries in the watermarkTimers set as I have seen events.
> My application will (in total) handle about 50K events/sec resulting in to thousands 'onEventTime' calls per second.
>
> So thank you. I now understand I have to be more careful with these timers!.
>
> Niels Basjes
>
>
>
> On Fri, Nov 27, 2015 at 11:28 AM, Aljoscha Krettek <[hidden email]> wrote:
> Hi Niels,
> do the records that arrive from Kafka already have the session ID or do you want to assign them inside your Flink job based on the idle timeout?
>
> For the rest of your problems you should be able to get by with what Flink provides:
>
> The triggering can be done using a custom Trigger that fires after we haven’t seen an element for 30 minutes.
> public class TimeoutTrigger implements Trigger<Object, Window> {
>    private static final long serialVersionUID = 1L;
>
>    @Override
>    public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
>       // on every element it will set a timer for 30 seconds in the future
>       // a trigger can only have 1 timer so we remove the old trigger when setting the new one
>       ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 30000); // this is 30 seconds but you can change it
>       return TriggerResult.CONTINUE;
>    }
>
>    @Override
>    public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) {
>       return TriggerResult.CONTINUE;
>    }
>
>    @Override
>    public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
>       return TriggerResult.FIRE_AND_PURGE;
>    }
>
>    @Override
>    public String toString() {
>       return "TimeoutTrigger()";
>    }
> }
>
> you would use it like this:
> stream.keyBy(…).window(…).trigger(new TimeoutTrigger())
>
> For writing to files you could use the RollingSink (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem). I think this does pretty much what you want. You can specify how large the files that it writes are, and it can also roll to new files on a specified time interval.
>
> Please let us know if you need more information.
>
> Cheers,
> Aljoscha
> > On 26 Nov 2015, at 22:13, Niels Basjes <[hidden email]> wrote:
> >
> > Hi,
> >
> > I'm trying to build something in Flink that relies heavily on the Windowing features.
> >
> > In essence what I want to build:
> > I have clickstream data coming in via Kafka. Each record (click) has a sessionid and a timestamp.
> > I want to create a window for each session and after 30 minutes idle I want all events for that session (visit) to be written to disk.
> > This should result in the effect that a specific visit exists in exactly one file.
> > Since HDFS does not like 'small files' I want to create a (set of) files every 15 minutes that contains several complete  visits.
> > So I need to buffer the 'completed visits' and flush them to disk in 15 minute batches.
> >
> > What I think I need to get this is:
> > 1) A map function that assigns the visit-id (i.e. new id after 30 minutes idle)
> > 2) A window per visit-id (close the window 30 minutes after the last click)
> > 3) A window per 15 minutes that only contains windows of visits that are complete
> >
> > Today I've been trying to get this setup and I think I have some parts that are in the right direction.
> >
> > I have some questions and I'm hoping you guys can help me:
> >
> > 1) I have trouble understanding the way a windowed stream works "exactly".
> > As a consequence I'm having a hard time verifying if my code does what I understand it should do.
> > I guess what would really help me is a very simple example on how to unittest such a window.
> >
> > 2) Is what I describe above perhaps already been done before? If so; any pointers are really appreciated.
> >
> > 3) Am I working in the right direction for what I'm trying to achieve; or should I use a different API? a different approach?
> >
> > Thanks
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes

Reply | Threaded
Open this post in threaded view
|

Re: Working with the Windowing functionality

Niels Basjes
In my specific case I can be more accurate about when the next time is when it would make sense to check.
Then for all next events I only record a little bit of state about the event stream (i.e. first and last event time).
I then only place a new timer in the timer handler and this way I can limit the number of different timers that will be set dramatically.

Niels


On Fri, Nov 27, 2015 at 4:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you are right in your analysis. Did you try running it with always setting the timer? Maybe it’s not the bottleneck of the computation. I would be very interested in seeing how this behaves since I only did tests with regular time windows, where the first if statement almost always directly returns, which is very cheap.

Cheers,
Aljoscha
> On 27 Nov 2015, at 13:59, Niels Basjes <[hidden email]> wrote:
>
> Hi,
>
> Thanks for all this input.
> I didn't know about the
>       // a trigger can only have 1 timer so we remove the old trigger when setting the new one
>
> This insight is to me of major importance.
> Let me explain:
> I found in the WindowOperator this code below.
>
> @Override
> public void registerEventTimeTimer(long time) {
>    if (watermarkTimer == time) {
>       // we already have set a trigger for that time
>       return;
>    }
>    Set<Context> triggers = watermarkTimers.get(time);
>    if (triggers == null) {
>       triggers = new HashSet<>();
>       watermarkTimers.put(time, triggers);
>    }
>    this.watermarkTimer = time;
>    triggers.add(this);
> }
>
> and
>
> if (time == watermarkTimer) {
>    watermarkTimer = -1;
>    Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
>
> Effectively the new value is stored; processed yet at the moment the trigger fires the call is not forwarded into the application.
> So if I would do it as you show in your example I would have the same number of trigger entries in the watermarkTimers set as I have seen events.
> My application will (in total) handle about 50K events/sec resulting in to thousands 'onEventTime' calls per second.
>
> So thank you. I now understand I have to be more careful with these timers!.
>
> Niels Basjes
>
>
>
> On Fri, Nov 27, 2015 at 11:28 AM, Aljoscha Krettek <[hidden email]> wrote:
> Hi Niels,
> do the records that arrive from Kafka already have the session ID or do you want to assign them inside your Flink job based on the idle timeout?
>
> For the rest of your problems you should be able to get by with what Flink provides:
>
> The triggering can be done using a custom Trigger that fires after we haven’t seen an element for 30 minutes.
> public class TimeoutTrigger implements Trigger<Object, Window> {
>    private static final long serialVersionUID = 1L;
>
>    @Override
>    public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
>       // on every element it will set a timer for 30 seconds in the future
>       // a trigger can only have 1 timer so we remove the old trigger when setting the new one
>       ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 30000); // this is 30 seconds but you can change it
>       return TriggerResult.CONTINUE;
>    }
>
>    @Override
>    public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) {
>       return TriggerResult.CONTINUE;
>    }
>
>    @Override
>    public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
>       return TriggerResult.FIRE_AND_PURGE;
>    }
>
>    @Override
>    public String toString() {
>       return "TimeoutTrigger()";
>    }
> }
>
> you would use it like this:
> stream.keyBy(…).window(…).trigger(new TimeoutTrigger())
>
> For writing to files you could use the RollingSink (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem). I think this does pretty much what you want. You can specify how large the files that it writes are, and it can also roll to new files on a specified time interval.
>
> Please let us know if you need more information.
>
> Cheers,
> Aljoscha
> > On 26 Nov 2015, at 22:13, Niels Basjes <[hidden email]> wrote:
> >
> > Hi,
> >
> > I'm trying to build something in Flink that relies heavily on the Windowing features.
> >
> > In essence what I want to build:
> > I have clickstream data coming in via Kafka. Each record (click) has a sessionid and a timestamp.
> > I want to create a window for each session and after 30 minutes idle I want all events for that session (visit) to be written to disk.
> > This should result in the effect that a specific visit exists in exactly one file.
> > Since HDFS does not like 'small files' I want to create a (set of) files every 15 minutes that contains several complete  visits.
> > So I need to buffer the 'completed visits' and flush them to disk in 15 minute batches.
> >
> > What I think I need to get this is:
> > 1) A map function that assigns the visit-id (i.e. new id after 30 minutes idle)
> > 2) A window per visit-id (close the window 30 minutes after the last click)
> > 3) A window per 15 minutes that only contains windows of visits that are complete
> >
> > Today I've been trying to get this setup and I think I have some parts that are in the right direction.
> >
> > I have some questions and I'm hoping you guys can help me:
> >
> > 1) I have trouble understanding the way a windowed stream works "exactly".
> > As a consequence I'm having a hard time verifying if my code does what I understand it should do.
> > I guess what would really help me is a very simple example on how to unittest such a window.
> >
> > 2) Is what I describe above perhaps already been done before? If so; any pointers are really appreciated.
> >
> > 3) Am I working in the right direction for what I'm trying to achieve; or should I use a different API? a different approach?
> >
> > Thanks
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes