How to use EventTimeSessionWindows.withDynamicGap()

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use EventTimeSessionWindows.withDynamicGap()

SimAzz
Hi All,

I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)

But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data.

I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks!

     
        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> { return event.get_Time();

                        });

        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


        DataStream<Event> Data = stream
                .keyBy((Event ride) -> ride.CorrID)
                .window(EventTimeSessionWindows.withDynamicGap((event)->{
                    return event.get_Time();}));



Where from the load of the message which i receive from Kafka i convert the date time in millis.

 public long get_Time() {
        long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
        this.millis = tn;
        return millis;
    }
    public void set_a_t_rt(String a_t_rt) {
        this.a_t_rt = a_t_rt;
    }




Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

Aljoscha Krettek
Hi,

I'm not sure that what you want is possible. You say you want more
windows when there are more events for a given time frame? That is when
the events are more dense in time?

Also, using the event timestamp as the gap doesn't look correct. The gap
basically specifies the timeout for a session (and I now realize that
maybe "gap" is not a good word for that). So if your timeout increases
as time goes on your successive sessions will just get bigger and bigger.

Best,
Aljoscha

On 12.11.20 15:56, Simone Cavallarin wrote:

> Hi All,
>
> I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)
>
> But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data.
>
> I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks!
>
>
>          FlinkKafkaConsumer<Event> kafkaData =
>                  new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
>          WatermarkStrategy<Event> wmStrategy =
>                  WatermarkStrategy
>                          .<Event>forMonotonousTimestamps()
>                          .withIdleness(Duration.ofMinutes(1))
>                          .withTimestampAssigner((event, timestamp) -> { return event.get_Time();
>
>                          });
>
>          DataStream<Event> stream = env.addSource(
>                  kafkaData.assignTimestampsAndWatermarks(wmStrategy));
>
>
>          DataStream<Event> Data = stream
>                  .keyBy((Event ride) -> ride.CorrID)
>                  .window(EventTimeSessionWindows.withDynamicGap((event)->{
>                      return event.get_Time();}));
>
>
>
> Where from the load of the message which i receive from Kafka i convert the date time in millis.
>
>   public long get_Time() {
>          long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
>          this.millis = tn;
>          return millis;
>      }
>      public void set_a_t_rt(String a_t_rt) {
>          this.a_t_rt = a_t_rt;
>      }
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

SimAzz
Hi Aljoscha,

Yes correct i would like to have more windows when there are more events for a given time frame. That is when
the events are more dense in time. I can calculate the time difference between each event and create a parameter that can create windows of different sizes dynamically based on past events. Maybe on the beginning it will be starting for a fix parameter but then the parameter should be learning and accommodate the data accordingly

Could you please give me an example on how to set the timeout?

I have been reading all around and I'm a bit confused. I thought that flink can create more windows when the events are more dense in time quite easily (https://www.ververica.com/blog/session-windowing-in-flink ).



To avoid having the successive sessions become bigger and bigger so should I  create a cap for example 1 min?

Many thanks for the help!
Best
Simon


From: Aljoscha Krettek <[hidden email]>
Sent: 12 November 2020 16:34
To: [hidden email] <[hidden email]>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
 
Hi,

I'm not sure that what you want is possible. You say you want more
windows when there are more events for a given time frame? That is when
the events are more dense in time?

Also, using the event timestamp as the gap doesn't look correct. The gap
basically specifies the timeout for a session (and I now realize that
maybe "gap" is not a good word for that). So if your timeout increases
as time goes on your successive sessions will just get bigger and bigger.

Best,
Aljoscha

On 12.11.20 15:56, Simone Cavallarin wrote:
> Hi All,
>
> I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)
>
> But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data.
>
> I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks!
>
>
>          FlinkKafkaConsumer<Event> kafkaData =
>                  new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
>          WatermarkStrategy<Event> wmStrategy =
>                  WatermarkStrategy
>                          .<Event>forMonotonousTimestamps()
>                          .withIdleness(Duration.ofMinutes(1))
>                          .withTimestampAssigner((event, timestamp) -> { return event.get_Time();
>
>                          });
>
>          DataStream<Event> stream = env.addSource(
>                  kafkaData.assignTimestampsAndWatermarks(wmStrategy));
>
>
>          DataStream<Event> Data = stream
>                  .keyBy((Event ride) -> ride.CorrID)
>                  .window(EventTimeSessionWindows.withDynamicGap((event)->{
>                      return event.get_Time();}));
>
>
>
> Where from the load of the message which i receive from Kafka i convert the date time in millis.
>
>   public long get_Time() {
>          long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
>          this.millis = tn;
>          return millis;
>      }
>      public void set_a_t_rt(String a_t_rt) {
>          this.a_t_rt = a_t_rt;
>      }
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

Aljoscha Krettek
Yes, you're right that Flink can do this with session windows but the
assignment will be static. In general, the smaller the session gap (or
session timeout) the fewer windows there will be.

You're also right that you would have to somehow maintain information
about how dense you records are in time and then use that to adjust the
session gap. So you could use a stateful operation (like a
ProcessFunction) to put a dynamic "gap" into the records and then use
that gap with EventTimeSessionWindows.

Best,
Aljoscha

On 12.11.20 18:16, Simone Cavallarin wrote:

> Hi Aljoscha,
>
> Yes correct i would like to have more windows when there are more events for a given time frame. That is when
> the events are more dense in time. I can calculate the time difference between each event and create a parameter that can create windows of different sizes dynamically based on past events. Maybe on the beginning it will be starting for a fix parameter but then the parameter should be learning and accommodate the data accordingly
>
> Could you please give me an example on how to set the timeout?
>
> I have been reading all around and I'm a bit confused. I thought that flink can create more windows when the events are more dense in time quite easily (https://www.ververica.com/blog/session-windowing-in-flink ).
>
> [cid:85daf58a-bc3e-4f39-94c2-d14fe2bf9c16]
>
> To avoid having the successive sessions become bigger and bigger so should I  create a cap for example 1 min?
>
> Many thanks for the help!
> Best
> Simon
>
> ________________________________
> From: Aljoscha Krettek <[hidden email]>
> Sent: 12 November 2020 16:34
> To: [hidden email] <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> Hi,
>
> I'm not sure that what you want is possible. You say you want more
> windows when there are more events for a given time frame? That is when
> the events are more dense in time?
>
> Also, using the event timestamp as the gap doesn't look correct. The gap
> basically specifies the timeout for a session (and I now realize that
> maybe "gap" is not a good word for that). So if your timeout increases
> as time goes on your successive sessions will just get bigger and bigger.
>
> Best,
> Aljoscha
>
> On 12.11.20 15:56, Simone Cavallarin wrote:
>> Hi All,
>>
>> I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)
>>
>> But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data.
>>
>> I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks!
>>
>>
>>           FlinkKafkaConsumer<Event> kafkaData =
>>                   new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
>>           WatermarkStrategy<Event> wmStrategy =
>>                   WatermarkStrategy
>>                           .<Event>forMonotonousTimestamps()
>>                           .withIdleness(Duration.ofMinutes(1))
>>                           .withTimestampAssigner((event, timestamp) -> { return event.get_Time();
>>
>>                           });
>>
>>           DataStream<Event> stream = env.addSource(
>>                   kafkaData.assignTimestampsAndWatermarks(wmStrategy));
>>
>>
>>           DataStream<Event> Data = stream
>>                   .keyBy((Event ride) -> ride.CorrID)
>>                   .window(EventTimeSessionWindows.withDynamicGap((event)->{
>>                       return event.get_Time();}));
>>
>>
>>
>> Where from the load of the message which i receive from Kafka i convert the date time in millis.
>>
>>    public long get_Time() {
>>           long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
>>           this.millis = tn;
>>           return millis;
>>       }
>>       public void set_a_t_rt(String a_t_rt) {
>>           this.a_t_rt = a_t_rt;
>>       }
>>
>>
>>
>>
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

SimAzz
+user@


From: Simone Cavallarin <[hidden email]>
Sent: 13 November 2020 16:46
To: Aljoscha Krettek <[hidden email]>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
 
Hi Aljoscha,

When you said: You could use a stateful operation (like a ProcessFunction) to put a dynamic "gap" into the records and then use that gap with EventTimeSessionWindows. I understand the theory but I'm struggling to put in practice in code terms.

stream = steam
    .keyBy(new MyKeySelector())
    .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor()))
    .sideOutputLateData(lateDataSideOutputTag)
    .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10))) // in case some key is continuously coming within the session window gap
    .process(new ProcessWindowFunction(……));
Where ProcessWindowFunction(……)update a parameter that is used inside DynamicWindowGapExtractor()...
I found this on the following link: https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger
If you could help me with some examples where i can read some code it would be so helpful.

Thanks!


From: Aljoscha Krettek <[hidden email]>
Sent: 13 November 2020 09:43
To: [hidden email] <[hidden email]>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
 
Yes, you're right that Flink can do this with session windows but the
assignment will be static. In general, the smaller the session gap (or
session timeout) the fewer windows there will be.

You're also right that you would have to somehow maintain information
about how dense you records are in time and then use that to adjust the
session gap. So you could use a stateful operation (like a
ProcessFunction) to put a dynamic "gap" into the records and then use
that gap with EventTimeSessionWindows.

Best,
Aljoscha

On 12.11.20 18:16, Simone Cavallarin wrote:
> Hi Aljoscha,
>
> Yes correct i would like to have more windows when there are more events for a given time frame. That is when
> the events are more dense in time. I can calculate the time difference between each event and create a parameter that can create windows of different sizes dynamically based on past events. Maybe on the beginning it will be starting for a fix parameter but then the parameter should be learning and accommodate the data accordingly
>
> Could you please give me an example on how to set the timeout?
>
> I have been reading all around and I'm a bit confused. I thought that flink can create more windows when the events are more dense in time quite easily (https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2Fblog%2Fsession-windowing-in-flink&amp;data=04%7C01%7C%7Cdb1c633bb89c45e523ac08d887b8a636%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637408574413261082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=BniIILdwiAykEhRIOd5ZdaRl%2Ftvhnr2Q88SeCnWxrT4%3D&amp;reserved=0 ).
>
> [cid:85daf58a-bc3e-4f39-94c2-d14fe2bf9c16]
>
> To avoid having the successive sessions become bigger and bigger so should I  create a cap for example 1 min?
>
> Many thanks for the help!
> Best
> Simon
>
> ________________________________
> From: Aljoscha Krettek <[hidden email]>
> Sent: 12 November 2020 16:34
> To: [hidden email] <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> Hi,
>
> I'm not sure that what you want is possible. You say you want more
> windows when there are more events for a given time frame? That is when
> the events are more dense in time?
>
> Also, using the event timestamp as the gap doesn't look correct. The gap
> basically specifies the timeout for a session (and I now realize that
> maybe "gap" is not a good word for that). So if your timeout increases
> as time goes on your successive sessions will just get bigger and bigger.
>
> Best,
> Aljoscha
>
> On 12.11.20 15:56, Simone Cavallarin wrote:
>> Hi All,
>>
>> I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)
>>
>> But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data.
>>
>> I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks!
>>
>>
>>           FlinkKafkaConsumer<Event> kafkaData =
>>                   new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
>>           WatermarkStrategy<Event> wmStrategy =
>>                   WatermarkStrategy
>>                           .<Event>forMonotonousTimestamps()
>>                           .withIdleness(Duration.ofMinutes(1))
>>                           .withTimestampAssigner((event, timestamp) -> { return event.get_Time();
>>
>>                           });
>>
>>           DataStream<Event> stream = env.addSource(
>>                   kafkaData.assignTimestampsAndWatermarks(wmStrategy));
>>
>>
>>           DataStream<Event> Data = stream
>>                   .keyBy((Event ride) -> ride.CorrID)
>>                   .window(EventTimeSessionWindows.withDynamicGap((event)->{
>>                       return event.get_Time();}));
>>
>>
>>
>> Where from the load of the message which i receive from Kafka i convert the date time in millis.
>>
>>    public long get_Time() {
>>           long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
>>           this.millis = tn;
>>           return millis;
>>       }
>>       public void set_a_t_rt(String a_t_rt) {
>>           this.a_t_rt = a_t_rt;
>>       }
>>
>>
>>
>>
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

SimAzz
Hi Aljoscha,

I found a similar question of mine by KristoffSC

The idea is the same and at the end of the thread this was the solution that you suggested: "There are no plans of adding state support to the gap extractors but you could do this using a two-step approach, i.e. have an operation in front of the window that keeps track of session gaps, enriches the message with the gap that should be used and then the extractor extracts that gap. This is a more modular approach compared to putting everything in one operator/extractor."


1) Operation in front of the windows -> keep track of the session gaps (I have been reading all around for this)
2) Enrich the message with the gap that should be use (this is a parameter can be for example an average of the last 10 gaps?)
  • (I got lost.) How can I enrich a message coming from Kafka, maybe adding this parameter to the next event?

3) The extractor extract the gap (that will be used to calculate a new gap parameter so it needs to be sent back on point 1 and be used on the windowing process)

  • (mmmmm.. okay now complitely lost...)

Thanks
s


From: Simone Cavallarin <[hidden email]>
Sent: 13 November 2020 16:55
To: Aljoscha Krettek <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
 
+user@


From: Simone Cavallarin <[hidden email]>
Sent: 13 November 2020 16:46
To: Aljoscha Krettek <[hidden email]>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
 
Hi Aljoscha,

When you said: You could use a stateful operation (like a ProcessFunction) to put a dynamic "gap" into the records and then use that gap with EventTimeSessionWindows. I understand the theory but I'm struggling to put in practice in code terms.

stream = steam
    .keyBy(new MyKeySelector())
    .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor()))
    .sideOutputLateData(lateDataSideOutputTag)
    .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10))) // in case some key is continuously coming within the session window gap
    .process(new ProcessWindowFunction(……));
Where ProcessWindowFunction(……)update a parameter that is used inside DynamicWindowGapExtractor()...
I found this on the following link: https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger
If you could help me with some examples where i can read some code it would be so helpful.

Thanks!


From: Aljoscha Krettek <[hidden email]>
Sent: 13 November 2020 09:43
To: [hidden email] <[hidden email]>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
 
Yes, you're right that Flink can do this with session windows but the
assignment will be static. In general, the smaller the session gap (or
session timeout) the fewer windows there will be.

You're also right that you would have to somehow maintain information
about how dense you records are in time and then use that to adjust the
session gap. So you could use a stateful operation (like a
ProcessFunction) to put a dynamic "gap" into the records and then use
that gap with EventTimeSessionWindows.

Best,
Aljoscha

On 12.11.20 18:16, Simone Cavallarin wrote:
> Hi Aljoscha,
>
> Yes correct i would like to have more windows when there are more events for a given time frame. That is when
> the events are more dense in time. I can calculate the time difference between each event and create a parameter that can create windows of different sizes dynamically based on past events. Maybe on the beginning it will be starting for a fix parameter but then the parameter should be learning and accommodate the data accordingly
>
> Could you please give me an example on how to set the timeout?
>
> I have been reading all around and I'm a bit confused. I thought that flink can create more windows when the events are more dense in time quite easily (https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2Fblog%2Fsession-windowing-in-flink&amp;data=04%7C01%7C%7Cdb1c633bb89c45e523ac08d887b8a636%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637408574413261082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=BniIILdwiAykEhRIOd5ZdaRl%2Ftvhnr2Q88SeCnWxrT4%3D&amp;reserved=0 ).
>
> [cid:85daf58a-bc3e-4f39-94c2-d14fe2bf9c16]
>
> To avoid having the successive sessions become bigger and bigger so should I  create a cap for example 1 min?
>
> Many thanks for the help!
> Best
> Simon
>
> ________________________________
> From: Aljoscha Krettek <[hidden email]>
> Sent: 12 November 2020 16:34
> To: [hidden email] <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> Hi,
>
> I'm not sure that what you want is possible. You say you want more
> windows when there are more events for a given time frame? That is when
> the events are more dense in time?
>
> Also, using the event timestamp as the gap doesn't look correct. The gap
> basically specifies the timeout for a session (and I now realize that
> maybe "gap" is not a good word for that). So if your timeout increases
> as time goes on your successive sessions will just get bigger and bigger.
>
> Best,
> Aljoscha
>
> On 12.11.20 15:56, Simone Cavallarin wrote:
>> Hi All,
>>
>> I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)
>>
>> But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data.
>>
>> I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks!
>>
>>
>>           FlinkKafkaConsumer<Event> kafkaData =
>>                   new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
>>           WatermarkStrategy<Event> wmStrategy =
>>                   WatermarkStrategy
>>                           .<Event>forMonotonousTimestamps()
>>                           .withIdleness(Duration.ofMinutes(1))
>>                           .withTimestampAssigner((event, timestamp) -> { return event.get_Time();
>>
>>                           });
>>
>>           DataStream<Event> stream = env.addSource(
>>                   kafkaData.assignTimestampsAndWatermarks(wmStrategy));
>>
>>
>>           DataStream<Event> Data = stream
>>                   .keyBy((Event ride) -> ride.CorrID)
>>                   .window(EventTimeSessionWindows.withDynamicGap((event)->{
>>                       return event.get_Time();}));
>>
>>
>>
>> Where from the load of the message which i receive from Kafka i convert the date time in millis.
>>
>>    public long get_Time() {
>>           long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
>>           this.millis = tn;
>>           return millis;
>>       }
>>       public void set_a_t_rt(String a_t_rt) {
>>           this.a_t_rt = a_t_rt;
>>       }
>>
>>
>>
>>
>>
>
>


Le




Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

Aljoscha Krettek
Hi,

thanks for the pointer, I should have remembered that thread earlier!

I'll try and sketch what the pipeline might look like to show what I
mean by "enriching the message" and where the operations would sit.

DataStream<MyMessageType> source = <Kafka Source>

DataStream<Tuple2<MyMessageType, Long>> enriched = source
   .keyBy(<key extractor>)
   .map(new StatefulSessionCalculator()); // or process()

DataStream<...> result = enriched
   .keyBy(new MyKeySelector())
   .window(EventTimeSessionWindows.withDynamicGap(
     new DynamicWindowGapExtractor()))
   .sideOutputLateData(lateDataSideOutputTag)
   .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10)))
   .process(new ProcessWindowFunction(...));

The stateful map function could look something like this:

Tuple2<MyMessageType, Long> map(MyMessageType input) {
   ValueState<MyState> valueState = getState(myModelStateDescriptor);
   MyState state = valueState.value()
   state.update(input);
   long suggestedGap = state.getSuggestedGap();
   valueState.update(state);
   return Tuple2.of(input, suggestedGap);
}

The two operations have to be separate because the session gap extractor
cannot be stateful.

I think, however, that it might be easier at this point to just use a
stateful ProcessFunction to not have to deal with the somewhat finicky
setup of the stateful extractor just to force it into the requirements
of the session windows API.

Best,
Aljoscha

On 14.11.20 10:50, Simone Cavallarin wrote:

> Hi Aljoscha,
>
> I found a similar question of mine by KristoffSC<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=2311> Jan, 2020, called Session Windows with dynamic gap.
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-Window-with-dynamic-gap-td31893.html
>
> The idea is the same and at the end of the thread this was the solution that you suggested: "There are no plans of adding state support to the gap extractors but you could do this using a two-step approach, i.e. have an operation in front of the window that keeps track of session gaps, enriches the message with the gap that should be used and then the extractor extracts that gap. This is a more modular approach compared to putting everything in one operator/extractor."
>
>
> 1) Operation in front of the windows -> keep track of the session gaps (I have been reading all around for this)
>
>    *   (https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.html)
>    *   https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
>    *   https://www.codota.com/code/java/classes/org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor
>
>
> 2) Enrich the message with the gap that should be use (this is a parameter can be for example an average of the last 10 gaps?)
>
>    *   (I got lost.) How can I enrich a message coming from Kafka, maybe adding this parameter to the next event?
>
> 3) The extractor extract the gap (that will be used to calculate a new gap parameter so it needs to be sent back on point 1 and be used on the windowing process)
>
>
>    *   (mmmmm.. okay now complitely lost...)
>
> Thanks
> s
>
> ________________________________
> From: Simone Cavallarin <[hidden email]>
> Sent: 13 November 2020 16:55
> To: Aljoscha Krettek <[hidden email]>
> Cc: user <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> +user@
>
> ________________________________
> From: Simone Cavallarin <[hidden email]>
> Sent: 13 November 2020 16:46
> To: Aljoscha Krettek <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> Hi Aljoscha,
>
> When you said: You could use a stateful operation (like a ProcessFunction) to put a dynamic "gap" into the records and then use that gap with EventTimeSessionWindows. I understand the theory but I'm struggling to put in practice in code terms.
>
> <https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger>
>
> stream = steam
>      .keyBy(new MyKeySelector())
>      .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor()))
>      .sideOutputLateData(lateDataSideOutputTag)
>      .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10))) // in case some key is continuously coming within the session window gap
>      .process(new ProcessWindowFunction(……));
>
>
> Where ProcessWindowFunction(……)update a parameter that is used inside DynamicWindowGapExtractor()...
>
> I found this on the following link: https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger
>
> If you could help me with some examples where i can read some code it would be so helpful.
>
> Thanks!
>
> ________________________________
> From: Aljoscha Krettek <[hidden email]>
> Sent: 13 November 2020 09:43
> To: [hidden email] <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> Yes, you're right that Flink can do this with session windows but the
> assignment will be static. In general, the smaller the session gap (or
> session timeout) the fewer windows there will be.
>
> You're also right that you would have to somehow maintain information
> about how dense you records are in time and then use that to adjust the
> session gap. So you could use a stateful operation (like a
> ProcessFunction) to put a dynamic "gap" into the records and then use
> that gap with EventTimeSessionWindows.
>
> Best,
> Aljoscha
>
> On 12.11.20 18:16, Simone Cavallarin wrote:
>> Hi Aljoscha,
>>
>> Yes correct i would like to have more windows when there are more events for a given time frame. That is when
>> the events are more dense in time. I can calculate the time difference between each event and create a parameter that can create windows of different sizes dynamically based on past events. Maybe on the beginning it will be starting for a fix parameter but then the parameter should be learning and accommodate the data accordingly
>>
>> Could you please give me an example on how to set the timeout?
>>
>> I have been reading all around and I'm a bit confused. I thought that flink can create more windows when the events are more dense in time quite easily (https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2Fblog%2Fsession-windowing-in-flink&amp;data=04%7C01%7C%7Cdb1c633bb89c45e523ac08d887b8a636%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637408574413261082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=BniIILdwiAykEhRIOd5ZdaRl%2Ftvhnr2Q88SeCnWxrT4%3D&amp;reserved=0 ).
>>
>> [cid:85daf58a-bc3e-4f39-94c2-d14fe2bf9c16]
>>
>> To avoid having the successive sessions become bigger and bigger so should I  create a cap for example 1 min?
>>
>> Many thanks for the help!
>> Best
>> Simon
>>
>> ________________________________
>> From: Aljoscha Krettek <[hidden email]>
>> Sent: 12 November 2020 16:34
>> To: [hidden email] <[hidden email]>
>> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>>
>> Hi,
>>
>> I'm not sure that what you want is possible. You say you want more
>> windows when there are more events for a given time frame? That is when
>> the events are more dense in time?
>>
>> Also, using the event timestamp as the gap doesn't look correct. The gap
>> basically specifies the timeout for a session (and I now realize that
>> maybe "gap" is not a good word for that). So if your timeout increases
>> as time goes on your successive sessions will just get bigger and bigger.
>>
>> Best,
>> Aljoscha
>>
>> On 12.11.20 15:56, Simone Cavallarin wrote:
>>> Hi All,
>>>
>>> I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)
>>>
>>> But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data.
>>>
>>> I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks!
>>>
>>>
>>>            FlinkKafkaConsumer<Event> kafkaData =
>>>                    new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
>>>            WatermarkStrategy<Event> wmStrategy =
>>>                    WatermarkStrategy
>>>                            .<Event>forMonotonousTimestamps()
>>>                            .withIdleness(Duration.ofMinutes(1))
>>>                            .withTimestampAssigner((event, timestamp) -> { return event.get_Time();
>>>
>>>                            });
>>>
>>>            DataStream<Event> stream = env.addSource(
>>>                    kafkaData.assignTimestampsAndWatermarks(wmStrategy));
>>>
>>>
>>>            DataStream<Event> Data = stream
>>>                    .keyBy((Event ride) -> ride.CorrID)
>>>                    .window(EventTimeSessionWindows.withDynamicGap((event)->{
>>>                        return event.get_Time();}));
>>>
>>>
>>>
>>> Where from the load of the message which i receive from Kafka i convert the date time in millis.
>>>
>>>     public long get_Time() {
>>>            long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
>>>            this.millis = tn;
>>>            return millis;
>>>        }
>>>        public void set_a_t_rt(String a_t_rt) {
>>>            this.a_t_rt = a_t_rt;
>>>        }
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>
> Le
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

SimAzz
Hi,

I have been working on the suggestion that you gave me, thanks! The first part is to add to the message the gap. 1)I receive the event, 2)I take that event and I map it using  StatefulsessionCalculator, that is where I put together "The message", and "long" that is my gap in millis.

DataStream<Event> source = <Kafka Source>

Operation in front of the window that keeps track of session gaps

DataStream<Tuple2<MyMessageType, Long>> enriched = source
   .keyBy(<key extractor>)
   .map(new StatefulSessionCalculator()); // or process()

This is my StatefulSessionCalculator():

Tuple2<MyMessageType, Long> map(MyMessageType input) {
   ValueState<MyState> valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
   state.update(input);
   long suggestedGap = state.getSuggestedGap();
   valueState.update(state);
   return Tuple2.of(input, suggestedGap);
}


If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?


The second step is to use the gap calculated through  DynamicWindowGapExtractor().

DataStream<...> result = enriched
   .keyBy(new MyKeySelector())
   .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor()))


The DynamicWindowGapExtractor() extract the gap from the message and feed it back to Flink.
Could you please give me an example also for this one?


One thing that I don't understand is that after enriching the message my event that contain a POJO is nested inside tuple. How can I access it?

This is my code,




Before the POJO was working fine using "stream" but now that I'm going through a Tuple2 i have some issues.




The last point when you said: "I think, however, that it might be easier at this point to just use a stateful ProcessFunction", you meant a completely different approach, would be better?

many thanks for the help.

s


From: Aljoscha Krettek <[hidden email]>
Sent: 16 November 2020 16:22
To: [hidden email] <[hidden email]>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
 
Hi,

thanks for the pointer, I should have remembered that thread earlier!

I'll try and sketch what the pipeline might look like to show what I
mean by "enriching the message" and where the operations would sit.

DataStream<MyMessageType> source = <Kafka Source>

DataStream<Tuple2<MyMessageType, Long>> enriched = source
   .keyBy(<key extractor>)
   .map(new StatefulSessionCalculator()); // or process()

DataStream<...> result = enriched
   .keyBy(new MyKeySelector())
   .window(EventTimeSessionWindows.withDynamicGap(
     new DynamicWindowGapExtractor()))
   .sideOutputLateData(lateDataSideOutputTag)
   .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10)))
   .process(new ProcessWindowFunction(...));

The stateful map function could look something like this:

Tuple2<MyMessageType, Long> map(MyMessageType input) {
   ValueState<MyState> valueState = getState(myModelStateDescriptor);
   MyState state = valueState.value()
   state.update(input);
   long suggestedGap = state.getSuggestedGap();
   valueState.update(state);
   return Tuple2.of(input, suggestedGap);
}

The two operations have to be separate because the session gap extractor
cannot be stateful.

I think, however, that it might be easier at this point to just use a
stateful ProcessFunction to not have to deal with the somewhat finicky
setup of the stateful extractor just to force it into the requirements
of the session windows API.

Best,
Aljoscha

On 14.11.20 10:50, Simone Cavallarin wrote:
> Hi Aljoscha,
>
> I found a similar question of mine by KristoffSC<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=2311> Jan, 2020, called Session Windows with dynamic gap.
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-Window-with-dynamic-gap-td31893.html
>
> The idea is the same and at the end of the thread this was the solution that you suggested: "There are no plans of adding state support to the gap extractors but you could do this using a two-step approach, i.e. have an operation in front of the window that keeps track of session gaps, enriches the message with the gap that should be used and then the extractor extracts that gap. This is a more modular approach compared to putting everything in one operator/extractor."
>
>
> 1) Operation in front of the windows -> keep track of the session gaps (I have been reading all around for this)
>
>    *   (https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.html)
>    *   https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
>    *   https://www.codota.com/code/java/classes/org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor
>
>
> 2) Enrich the message with the gap that should be use (this is a parameter can be for example an average of the last 10 gaps?)
>
>    *   (I got lost.) How can I enrich a message coming from Kafka, maybe adding this parameter to the next event?
>
> 3) The extractor extract the gap (that will be used to calculate a new gap parameter so it needs to be sent back on point 1 and be used on the windowing process)
>
>
>    *   (mmmmm.. okay now complitely lost...)
>
> Thanks
> s
>
> ________________________________
> From: Simone Cavallarin <[hidden email]>
> Sent: 13 November 2020 16:55
> To: Aljoscha Krettek <[hidden email]>
> Cc: user <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> +user@
>
> ________________________________
> From: Simone Cavallarin <[hidden email]>
> Sent: 13 November 2020 16:46
> To: Aljoscha Krettek <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> Hi Aljoscha,
>
> When you said: You could use a stateful operation (like a ProcessFunction) to put a dynamic "gap" into the records and then use that gap with EventTimeSessionWindows. I understand the theory but I'm struggling to put in practice in code terms.
>
> <https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger>
>
> stream = steam
>      .keyBy(new MyKeySelector())
>      .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor()))
>      .sideOutputLateData(lateDataSideOutputTag)
>      .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10))) // in case some key is continuously coming within the session window gap
>      .process(new ProcessWindowFunction(……));
>
>
> Where ProcessWindowFunction(……)update a parameter that is used inside DynamicWindowGapExtractor()...
>
> I found this on the following link: https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger
>
> If you could help me with some examples where i can read some code it would be so helpful.
>
> Thanks!
>
> ________________________________
> From: Aljoscha Krettek <[hidden email]>
> Sent: 13 November 2020 09:43
> To: [hidden email] <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> Yes, you're right that Flink can do this with session windows but the
> assignment will be static. In general, the smaller the session gap (or
> session timeout) the fewer windows there will be.
>
> You're also right that you would have to somehow maintain information
> about how dense you records are in time and then use that to adjust the
> session gap. So you could use a stateful operation (like a
> ProcessFunction) to put a dynamic "gap" into the records and then use
> that gap with EventTimeSessionWindows.
>
> Best,
> Aljoscha
>
> On 12.11.20 18:16, Simone Cavallarin wrote:
>> Hi Aljoscha,
>>
>> Yes correct i would like to have more windows when there are more events for a given time frame. That is when
>> the events are more dense in time. I can calculate the time difference between each event and create a parameter that can create windows of different sizes dynamically based on past events. Maybe on the beginning it will be starting for a fix parameter but then the parameter should be learning and accommodate the data accordingly
>>
>> Could you please give me an example on how to set the timeout?
>>
>> I have been reading all around and I'm a bit confused. I thought that flink can create more windows when the events are more dense in time quite easily (https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2Fblog%2Fsession-windowing-in-flink&amp;data=04%7C01%7C%7Cdb1c633bb89c45e523ac08d887b8a636%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637408574413261082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=BniIILdwiAykEhRIOd5ZdaRl%2Ftvhnr2Q88SeCnWxrT4%3D&amp;reserved=0 ).
>>
>> [cid:85daf58a-bc3e-4f39-94c2-d14fe2bf9c16]
>>
>> To avoid having the successive sessions become bigger and bigger so should I  create a cap for example 1 min?
>>
>> Many thanks for the help!
>> Best
>> Simon
>>
>> ________________________________
>> From: Aljoscha Krettek <[hidden email]>
>> Sent: 12 November 2020 16:34
>> To: [hidden email] <[hidden email]>
>> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>>
>> Hi,
>>
>> I'm not sure that what you want is possible. You say you want more
>> windows when there are more events for a given time frame? That is when
>> the events are more dense in time?
>>
>> Also, using the event timestamp as the gap doesn't look correct. The gap
>> basically specifies the timeout for a session (and I now realize that
>> maybe "gap" is not a good word for that). So if your timeout increases
>> as time goes on your successive sessions will just get bigger and bigger.
>>
>> Best,
>> Aljoscha
>>
>> On 12.11.20 15:56, Simone Cavallarin wrote:
>>> Hi All,
>>>
>>> I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)
>>>
>>> But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data.
>>>
>>> I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks!
>>>
>>>
>>>            FlinkKafkaConsumer<Event> kafkaData =
>>>                    new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
>>>            WatermarkStrategy<Event> wmStrategy =
>>>                    WatermarkStrategy
>>>                            .<Event>forMonotonousTimestamps()
>>>                            .withIdleness(Duration.ofMinutes(1))
>>>                            .withTimestampAssigner((event, timestamp) -> { return event.get_Time();
>>>
>>>                            });
>>>
>>>            DataStream<Event> stream = env.addSource(
>>>                    kafkaData.assignTimestampsAndWatermarks(wmStrategy));
>>>
>>>
>>>            DataStream<Event> Data = stream
>>>                    .keyBy((Event ride) -> ride.CorrID)
>>>                    .window(EventTimeSessionWindows.withDynamicGap((event)->{
>>>                        return event.get_Time();}));
>>>
>>>
>>>
>>> Where from the load of the message which i receive from Kafka i convert the date time in millis.
>>>
>>>     public long get_Time() {
>>>            long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
>>>            this.millis = tn;
>>>            return millis;
>>>        }
>>>        public void set_a_t_rt(String a_t_rt) {
>>>            this.a_t_rt = a_t_rt;
>>>        }
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>
> Le
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

Aljoscha Krettek
On 17.11.20 17:37, Simone Cavallarin wrote:

> Hi,
>
> I have been working on the suggestion that you gave me, thanks! The first part is to add to the message the gap. 1)I receive the event, 2)I take that event and I map it using  StatefulsessionCalculator, that is where I put together "The message", and "long" that is my gap in millis.
>
> DataStream<Event> source = <Kafka Source>
>
> Operation in front of the window that keeps track of session gaps
>
> DataStream<Tuple2<MyMessageType, Long>> enriched = source
>     .keyBy(<key extractor>)
>     .map(new StatefulSessionCalculator()); // or process()
>
> This is my StatefulSessionCalculator():
>
> Tuple2<MyMessageType, Long> map(MyMessageType input) {
>     ValueState<MyState> valueState = getState(myModelStateDescriptor);
> MyState state = valueState.value()
>     state.update(input);
>     long suggestedGap = state.getSuggestedGap();
>     valueState.update(state);
>     return Tuple2.of(input, suggestedGap);
> }
>
> If the "gap" calculated is "1234".
> The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?

That looks correct, yes.

> The second step is to use the gap calculated through  DynamicWindowGapExtractor().
>
> DataStream<...> result = enriched
>     .keyBy(new MyKeySelector())
>     .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor()))
>
>
> The DynamicWindowGapExtractor() extract the gap from the message and feed it back to Flink.
> Could you please give me an example also for this one?

This would just be class that extends
SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap
from the extract() method.

> One thing that I don't understand is that after enriching the message my event that contain a POJO is nested inside tuple. How can I access it?

You would just read the first field of the tuple, i.e. tuple.f0.


> The last point when you said: "I think, however, that it might be easier at this point to just use a stateful ProcessFunction", you meant a completely different approach, would be better?

That's what I meant yes. Because it seems to complicated to split the
logic into the part that determines the dynamic gap and then another
part that does the computation per session. It seems easier to just roll
that into one operator that does everything. And with state and timers
you should have enough flexibility.

Best,
Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

SimAzz
Many thanks for the Help!!

Simone


From: Aljoscha Krettek <[hidden email]>
Sent: 19 November 2020 11:46
To: [hidden email] <[hidden email]>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
 
On 17.11.20 17:37, Simone Cavallarin wrote:
> Hi,
>
> I have been working on the suggestion that you gave me, thanks! The first part is to add to the message the gap. 1)I receive the event, 2)I take that event and I map it using  StatefulsessionCalculator, that is where I put together "The message", and "long" that is my gap in millis.
>
> DataStream<Event> source = <Kafka Source>
>
> Operation in front of the window that keeps track of session gaps
>
> DataStream<Tuple2<MyMessageType, Long>> enriched = source
>     .keyBy(<key extractor>)
>     .map(new StatefulSessionCalculator()); // or process()
>
> This is my StatefulSessionCalculator():
>
> Tuple2<MyMessageType, Long> map(MyMessageType input) {
>     ValueState<MyState> valueState = getState(myModelStateDescriptor);
> MyState state = valueState.value()
>     state.update(input);
>     long suggestedGap = state.getSuggestedGap();
>     valueState.update(state);
>     return Tuple2.of(input, suggestedGap);
> }
>
> If the "gap" calculated is "1234".
> The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?

That looks correct, yes.

> The second step is to use the gap calculated through  DynamicWindowGapExtractor().
>
> DataStream<...> result = enriched
>     .keyBy(new MyKeySelector())
>     .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor()))
>
>
> The DynamicWindowGapExtractor() extract the gap from the message and feed it back to Flink.
> Could you please give me an example also for this one?

This would just be class that extends
SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap
from the extract() method.

> One thing that I don't understand is that after enriching the message my event that contain a POJO is nested inside tuple. How can I access it?

You would just read the first field of the tuple, i.e. tuple.f0.


> The last point when you said: "I think, however, that it might be easier at this point to just use a stateful ProcessFunction", you meant a completely different approach, would be better?

That's what I meant yes. Because it seems to complicated to split the
logic into the part that determines the dynamic gap and then another
part that does the computation per session. It seems easier to just roll
that into one operator that does everything. And with state and timers
you should have enough flexibility.

Best,
Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: How to use EventTimeSessionWindows.withDynamicGap()

Aljoscha Krettek
Sure, my pleasure!

Aljoscha

On 19.11.20 16:12, Simone Cavallarin wrote:

> Many thanks for the Help!!
>
> Simone
>
> ________________________________
> From: Aljoscha Krettek <[hidden email]>
> Sent: 19 November 2020 11:46
> To: [hidden email] <[hidden email]>
> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
>
> On 17.11.20 17:37, Simone Cavallarin wrote:
>> Hi,
>>
>> I have been working on the suggestion that you gave me, thanks! The first part is to add to the message the gap. 1)I receive the event, 2)I take that event and I map it using  StatefulsessionCalculator, that is where I put together "The message", and "long" that is my gap in millis.
>>
>> DataStream<Event> source = <Kafka Source>
>>
>> Operation in front of the window that keeps track of session gaps
>>
>> DataStream<Tuple2<MyMessageType, Long>> enriched = source
>>      .keyBy(<key extractor>)
>>      .map(new StatefulSessionCalculator()); // or process()
>>
>> This is my StatefulSessionCalculator():
>>
>> Tuple2<MyMessageType, Long> map(MyMessageType input) {
>>      ValueState<MyState> valueState = getState(myModelStateDescriptor);
>> MyState state = valueState.value()
>>      state.update(input);
>>      long suggestedGap = state.getSuggestedGap();
>>      valueState.update(state);
>>      return Tuple2.of(input, suggestedGap);
>> }
>>
>> If the "gap" calculated is "1234".
>> The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?
>
> That looks correct, yes.
>
>> The second step is to use the gap calculated through  DynamicWindowGapExtractor().
>>
>> DataStream<...> result = enriched
>>      .keyBy(new MyKeySelector())
>>      .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor()))
>>
>>
>> The DynamicWindowGapExtractor() extract the gap from the message and feed it back to Flink.
>> Could you please give me an example also for this one?
>
> This would just be class that extends
> SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap
> from the extract() method.
>
>> One thing that I don't understand is that after enriching the message my event that contain a POJO is nested inside tuple. How can I access it?
>
> You would just read the first field of the tuple, i.e. tuple.f0.
>
>
>> The last point when you said: "I think, however, that it might be easier at this point to just use a stateful ProcessFunction", you meant a completely different approach, would be better?
>
> That's what I meant yes. Because it seems to complicated to split the
> logic into the part that determines the dynamic gap and then another
> part that does the computation per session. It seems easier to just roll
> that into one operator that does everything. And with state and timers
> you should have enough flexibility.
>
> Best,
> Aljoscha
>
>