Flink Training - why cannot keyBy hour?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Training - why cannot keyBy hour?

Eleanore Jin
Hi experts, 

I am going through Ververica flink training, and when doing the lab with window (https://training.ververica.com/exercises/windows), basically it requires to compute within an hour which driver earns the most tip. 

The logic is to 
0. keyBy driverId
1. create 1 hour window based on eventTime
2. sum up all the tips for this driver within this 1 hour window
3. create an 1 hour globalWindow for all drivers
4. find the max tips

sample code shown as below.
SingleOutputStreamOperator<Tuple3<Long, Long, Float>> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
.window(TumblingEventTimeWindows.
of(Time.hours(1)))
.process(
new SumTipsFunction());

// Tuple3: reporting the timestamp for the end of the hour, the driverId, and the total of that driver's tips for that hour
SingleOutputStreamOperator<Tuple3<Long, Long, Float>> hourlyMax =
aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.
of(Time.hours(1)))
.maxBy(
2);

The question is shown as 4th slide: why we cannot keyed by the hour?
If I change the implementation to keyBy hour and run the HourlyTipsTest,
the test of testMaxAcrossDrivers will fail:
// (946688400000,1,6.0) -> for timestamp window: 946688400000, driverId: 1, earns most tip: 6.0
Expected :[(946688400000,1,6.0), (946692000000,2,20.0)]
Actual   :[(946688400000,1,6.0), (946692000000,2,20.0), (946692000000,2,20.0)]

image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Training - why cannot keyBy hour?

David Anderson-3
Eleanore,

Yes, if you change the implementation in the way that is suggested by the slide, the tests will fail. But it's more interesting to observe the behavior in the console.

The notes that go with that slide explain the situation in more detail. (Use alt-p or option-p to see the notes). But to recap here, there are two related effects:

(1) Instead of producing a single result at the end of the window, this alternative implementation produces a result for every event. In other words, it produces a stream that eventually arrives at the same maximum value produced by the timeWindowAll.

(2) With timeWindowAll, once the results for a given hour have been produced, Flink frees the state associated with the window for that hour. It knows, based on the watermarking, that no more events are expected, so the state is no longer needed and can be cleared. But with maxBy, the state for each key (each hour) is kept forever. This is why this is not a good approach: the keyspace is unbounded, and we can't intervene to clean up stale state. 

Regards,
David

On Wed, Jul 1, 2020 at 2:26 AM Eleanore Jin <[hidden email]> wrote:
Hi experts, 

I am going through Ververica flink training, and when doing the lab with window (https://training.ververica.com/exercises/windows), basically it requires to compute within an hour which driver earns the most tip. 

The logic is to 
0. keyBy driverId
1. create 1 hour window based on eventTime
2. sum up all the tips for this driver within this 1 hour window
3. create an 1 hour globalWindow for all drivers
4. find the max tips

sample code shown as below.
SingleOutputStreamOperator<Tuple3<Long, Long, Float>> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
.window(TumblingEventTimeWindows.
of(Time.hours(1)))
.process(
new SumTipsFunction());

// Tuple3: reporting the timestamp for the end of the hour, the driverId, and the total of that driver's tips for that hour
SingleOutputStreamOperator<Tuple3<Long, Long, Float>> hourlyMax =
aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.
of(Time.hours(1)))
.maxBy(
2);

The question is shown as 4th slide: why we cannot keyed by the hour?
If I change the implementation to keyBy hour and run the HourlyTipsTest,
the test of testMaxAcrossDrivers will fail:
// (946688400000,1,6.0) -> for timestamp window: 946688400000, driverId: 1, earns most tip: 6.0
Expected :[(946688400000,1,6.0), (946692000000,2,20.0)]
Actual   :[(946688400000,1,6.0), (946692000000,2,20.0), (946692000000,2,20.0)]

image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Training - why cannot keyBy hour?

Eleanore Jin
Hi David, 

Thanks a lot for the explanation!

Eleanore

On Thu, Jul 2, 2020 at 6:30 AM David Anderson <[hidden email]> wrote:
Eleanore,

Yes, if you change the implementation in the way that is suggested by the slide, the tests will fail. But it's more interesting to observe the behavior in the console.

The notes that go with that slide explain the situation in more detail. (Use alt-p or option-p to see the notes). But to recap here, there are two related effects:

(1) Instead of producing a single result at the end of the window, this alternative implementation produces a result for every event. In other words, it produces a stream that eventually arrives at the same maximum value produced by the timeWindowAll.

(2) With timeWindowAll, once the results for a given hour have been produced, Flink frees the state associated with the window for that hour. It knows, based on the watermarking, that no more events are expected, so the state is no longer needed and can be cleared. But with maxBy, the state for each key (each hour) is kept forever. This is why this is not a good approach: the keyspace is unbounded, and we can't intervene to clean up stale state. 

Regards,
David

On Wed, Jul 1, 2020 at 2:26 AM Eleanore Jin <[hidden email]> wrote:
Hi experts, 

I am going through Ververica flink training, and when doing the lab with window (https://training.ververica.com/exercises/windows), basically it requires to compute within an hour which driver earns the most tip. 

The logic is to 
0. keyBy driverId
1. create 1 hour window based on eventTime
2. sum up all the tips for this driver within this 1 hour window
3. create an 1 hour globalWindow for all drivers
4. find the max tips

sample code shown as below.
SingleOutputStreamOperator<Tuple3<Long, Long, Float>> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
.window(TumblingEventTimeWindows.
of(Time.hours(1)))
.process(
new SumTipsFunction());

// Tuple3: reporting the timestamp for the end of the hour, the driverId, and the total of that driver's tips for that hour
SingleOutputStreamOperator<Tuple3<Long, Long, Float>> hourlyMax =
aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.
of(Time.hours(1)))
.maxBy(
2);

The question is shown as 4th slide: why we cannot keyed by the hour?
If I change the implementation to keyBy hour and run the HourlyTipsTest,
the test of testMaxAcrossDrivers will fail:
// (946688400000,1,6.0) -> for timestamp window: 946688400000, driverId: 1, earns most tip: 6.0
Expected :[(946688400000,1,6.0), (946692000000,2,20.0)]
Actual   :[(946688400000,1,6.0), (946692000000,2,20.0), (946692000000,2,20.0)]

image.png

Thanks a lot!
Eleanore