Session Windows - not working as expected

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Session Windows - not working as expected

Swagat Mishra
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Sam
Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Sam
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Swagat Mishra
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Arvid Heise-4
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Swagat Mishra
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Arvid Heise-4
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[hidden email]> wrote:
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Swagat Mishra
Thank you.
sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked. 
However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. 

changes for reference: 
I made the following change, also removed anly lag that i had introduced for experimentation earlier.
so trigger looks like:

@Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime()); return TriggerResult.FIRE;
        }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}
and removed latenness
customerStream
        //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[hidden email]> wrote:
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[hidden email]> wrote:
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Arvid Heise-4
If you keyby then all direct functions see only the elements with the same key. So that's the expected behavior and the base of Flink's parallel processing capabilities.

If you want to generate a window over all customers, you have to use a global window. However, that also means that no parallelization can happen, so I'd discourage that.

A better way would be to perform as many calculations as possible in the process function (for example create a customer with buy information record) and then have a DataStream#global() reshuffle to collect all aggregated information on one node.

On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <[hidden email]> wrote:
Thank you.
sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked. 
However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. 

changes for reference: 
I made the following change, also removed anly lag that i had introduced for experimentation earlier.
so trigger looks like:

@Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime()); return TriggerResult.FIRE;
        }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}
and removed latenness
customerStream
        //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[hidden email]> wrote:
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[hidden email]> wrote:
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Swagat Mishra
thank you

i wil have a look at datasteeam.global

is there any other way to maintain state like by using valuestate.


On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <[hidden email]> wrote:
If you keyby then all direct functions see only the elements with the same key. So that's the expected behavior and the base of Flink's parallel processing capabilities.

If you want to generate a window over all customers, you have to use a global window. However, that also means that no parallelization can happen, so I'd discourage that.

A better way would be to perform as many calculations as possible in the process function (for example create a customer with buy information record) and then have a DataStream#global() reshuffle to collect all aggregated information on one node.

On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <[hidden email]> wrote:
Thank you.
sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked. 
However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. 

changes for reference: 
I made the following change, also removed anly lag that i had introduced for experimentation earlier.
so trigger looks like:

@Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime()); return TriggerResult.FIRE;
        }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}
and removed latenness
customerStream
        //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[hidden email]> wrote:
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[hidden email]> wrote:
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Arvid Heise-4
I'm not sure what you want to achieve exactly.

You can always keyby the values by a constant pseudo-key such that all values will be in the same partition (so instead of using global but with the same effect). Then you can use a process function to maintain the state. Just make sure that your data volume is low enough as this part is not parallelizable by definition.

On Thu, May 6, 2021 at 10:09 AM Swagat Mishra <[hidden email]> wrote:
thank you

i wil have a look at datasteeam.global

is there any other way to maintain state like by using valuestate.


On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <[hidden email]> wrote:
If you keyby then all direct functions see only the elements with the same key. So that's the expected behavior and the base of Flink's parallel processing capabilities.

If you want to generate a window over all customers, you have to use a global window. However, that also means that no parallelization can happen, so I'd discourage that.

A better way would be to perform as many calculations as possible in the process function (for example create a customer with buy information record) and then have a DataStream#global() reshuffle to collect all aggregated information on one node.

On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <[hidden email]> wrote:
Thank you.
sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked. 
However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. 

changes for reference: 
I made the following change, also removed anly lag that i had introduced for experimentation earlier.
so trigger looks like:

@Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime()); return TriggerResult.FIRE;
        }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}
and removed latenness
customerStream
        //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[hidden email]> wrote:
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[hidden email]> wrote:
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Swagat Mishra
I want to aggregate the user activity e.g number of products the user has purchased in the last 1 hour. 

so - User A (ID = USER-A)  purchases a1 product at 10:30 and another product at 10:45 AM and another product at 1:30 AM. 

My API should give 2 products purchased if the API call happens at 11:29 AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM

The API will access data persisted from the flink streaming output. 

As of now I am doing keyby on (ID = USER-A) . 

Do I have to maintain my own own calculated state within the process window function. Is the process window function shared across all keys or one instance per key.  Do you recommend the state to be maintained in State or elastic?

Also, if I change the processing to processing time instead of event time, the aggregation is happening. Any reason why flink could not provide event time aggregations like the processing time aggregation.



On Thu, May 6, 2021 at 7:11 PM Arvid Heise <[hidden email]> wrote:
I'm not sure what you want to achieve exactly.

You can always keyby the values by a constant pseudo-key such that all values will be in the same partition (so instead of using global but with the same effect). Then you can use a process function to maintain the state. Just make sure that your data volume is low enough as this part is not parallelizable by definition.

On Thu, May 6, 2021 at 10:09 AM Swagat Mishra <[hidden email]> wrote:
thank you

i wil have a look at datasteeam.global

is there any other way to maintain state like by using valuestate.


On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <[hidden email]> wrote:
If you keyby then all direct functions see only the elements with the same key. So that's the expected behavior and the base of Flink's parallel processing capabilities.

If you want to generate a window over all customers, you have to use a global window. However, that also means that no parallelization can happen, so I'd discourage that.

A better way would be to perform as many calculations as possible in the process function (for example create a customer with buy information record) and then have a DataStream#global() reshuffle to collect all aggregated information on one node.

On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <[hidden email]> wrote:
Thank you.
sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked. 
However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. 

changes for reference: 
I made the following change, also removed anly lag that i had introduced for experimentation earlier.
so trigger looks like:

@Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime()); return TriggerResult.FIRE;
        }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}
and removed latenness
customerStream
        //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[hidden email]> wrote:
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[hidden email]> wrote:
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Swagat Mishra
I meant "Do you recommend the state to be maintained in Value State or external store like elastic?"

On Thu, May 6, 2021 at 8:46 PM Swagat Mishra <[hidden email]> wrote:
I want to aggregate the user activity e.g number of products the user has purchased in the last 1 hour. 

so - User A (ID = USER-A)  purchases a1 product at 10:30 and another product at 10:45 AM and another product at 1:30 AM. 

My API should give 2 products purchased if the API call happens at 11:29 AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM

The API will access data persisted from the flink streaming output. 

As of now I am doing keyby on (ID = USER-A) . 

Do I have to maintain my own own calculated state within the process window function. Is the process window function shared across all keys or one instance per key.  Do you recommend the state to be maintained in State or elastic?

Also, if I change the processing to processing time instead of event time, the aggregation is happening. Any reason why flink could not provide event time aggregations like the processing time aggregation.



On Thu, May 6, 2021 at 7:11 PM Arvid Heise <[hidden email]> wrote:
I'm not sure what you want to achieve exactly.

You can always keyby the values by a constant pseudo-key such that all values will be in the same partition (so instead of using global but with the same effect). Then you can use a process function to maintain the state. Just make sure that your data volume is low enough as this part is not parallelizable by definition.

On Thu, May 6, 2021 at 10:09 AM Swagat Mishra <[hidden email]> wrote:
thank you

i wil have a look at datasteeam.global

is there any other way to maintain state like by using valuestate.


On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <[hidden email]> wrote:
If you keyby then all direct functions see only the elements with the same key. So that's the expected behavior and the base of Flink's parallel processing capabilities.

If you want to generate a window over all customers, you have to use a global window. However, that also means that no parallelization can happen, so I'd discourage that.

A better way would be to perform as many calculations as possible in the process function (for example create a customer with buy information record) and then have a DataStream#global() reshuffle to collect all aggregated information on one node.

On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <[hidden email]> wrote:
Thank you.
sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked. 
However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. 

changes for reference: 
I made the following change, also removed anly lag that i had introduced for experimentation earlier.
so trigger looks like:

@Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime()); return TriggerResult.FIRE;
        }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}
and removed latenness
customerStream
        //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[hidden email]> wrote:
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[hidden email]> wrote:
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.



Reply | Threaded
Open this post in threaded view
|

Re: Session Windows - not working as expected

Swagat Mishra
I am able to maintain a list state in process function and aggregate the values, how do i get a notification/event to remove the value from the stored list state.

On Thu, May 6, 2021 at 8:47 PM Swagat Mishra <[hidden email]> wrote:
I meant "Do you recommend the state to be maintained in Value State or external store like elastic?"

On Thu, May 6, 2021 at 8:46 PM Swagat Mishra <[hidden email]> wrote:
I want to aggregate the user activity e.g number of products the user has purchased in the last 1 hour. 

so - User A (ID = USER-A)  purchases a1 product at 10:30 and another product at 10:45 AM and another product at 1:30 AM. 

My API should give 2 products purchased if the API call happens at 11:29 AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM

The API will access data persisted from the flink streaming output. 

As of now I am doing keyby on (ID = USER-A) . 

Do I have to maintain my own own calculated state within the process window function. Is the process window function shared across all keys or one instance per key.  Do you recommend the state to be maintained in State or elastic?

Also, if I change the processing to processing time instead of event time, the aggregation is happening. Any reason why flink could not provide event time aggregations like the processing time aggregation.



On Thu, May 6, 2021 at 7:11 PM Arvid Heise <[hidden email]> wrote:
I'm not sure what you want to achieve exactly.

You can always keyby the values by a constant pseudo-key such that all values will be in the same partition (so instead of using global but with the same effect). Then you can use a process function to maintain the state. Just make sure that your data volume is low enough as this part is not parallelizable by definition.

On Thu, May 6, 2021 at 10:09 AM Swagat Mishra <[hidden email]> wrote:
thank you

i wil have a look at datasteeam.global

is there any other way to maintain state like by using valuestate.


On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <[hidden email]> wrote:
If you keyby then all direct functions see only the elements with the same key. So that's the expected behavior and the base of Flink's parallel processing capabilities.

If you want to generate a window over all customers, you have to use a global window. However, that also means that no parallelization can happen, so I'd discourage that.

A better way would be to perform as many calculations as possible in the process function (for example create a customer with buy information record) and then have a DataStream#global() reshuffle to collect all aggregated information on one node.

On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <[hidden email]> wrote:
Thank you.
sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked. 
However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. 

changes for reference: 
I made the following change, also removed anly lag that i had introduced for experimentation earlier.
so trigger looks like:

@Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime()); return TriggerResult.FIRE;
        }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}
and removed latenness
customerStream
        //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[hidden email]> wrote:
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[hidden email]> wrote:
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method  this.isWindowLate(actualWindow) is getting executed to false for the rest of the events except the first, hence the events are getting skipped, not able to figure out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like
customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());

Customer generator looks like:
while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(10000);
}
}

Custom Watermark generator has this:
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
trigger looks like:
------

 @Override
public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) {
// if (timeWindow.maxTimestamp() > triggerContext.getCurrentWatermark()) {
// triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// return TriggerResult.CONTINUE;
// }

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

....

On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[hidden email]> wrote:
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[hidden email]> wrote:
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers?

Please advise.


WORKING:
customerStream
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());

NOT WORKING:
customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

On Thu, May 6, 2021 at 1:53 AM Sam <[hidden email]> wrote:
Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
 

On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[hidden email]> wrote:
Hi,

Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration  ( say 10 seconds ) and write to a sink. 

I am using session windows to achieve the above. 

For test purposes, i have mocked  up a customer stream and executed session windows like below. 

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation.
please advise.