Why is the size of each checkpoint increasing?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Why is the size of each checkpoint increasing?

陈Darling

Flink version is 1.81
The eaxmple is adapted according to TopSpeedWindowing
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
.assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
.keyBy(0)
.countWindow(countSize, slideSize)
.trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
private static final long serialVersionUID = 1L;

@Override
public double getDelta(
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
}, carData.getType().createSerializer(env.getConfig())))
.maxBy(1).setParallelism(parallelism);

The size of each checkpoint will increase from 100k to 100m.

Why is the size of each checkpoint increasing? 
In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}


Has anyone encountered a similar problem?





Darling 
Andrew D.Lin



Reply | Threaded
Open this post in threaded view
|

Re: Why is the size of each checkpoint increasing?

Till Rohrmann
I think the size of the checkpoint strongly depends on the data you are feeding into this function. Depending on the actual values, it might be that you never fire the window. Please verify what carData actually returns.

Cheers,
Till

On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <[hidden email]> wrote:

Flink version is 1.81
The eaxmple is adapted according to TopSpeedWindowing
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
.assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
.keyBy(0)
.countWindow(countSize, slideSize)
.trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
private static final long serialVersionUID = 1L;

@Override
public double getDelta(
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
}, carData.getType().createSerializer(env.getConfig())))
.maxBy(1).setParallelism(parallelism);

The size of each checkpoint will increase from 100k to 100m.

Why is the size of each checkpoint increasing? 
In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}


Has anyone encountered a similar problem?





Darling 
Andrew D.Lin



Reply | Threaded
Open this post in threaded view
|

Fwd: Why is the size of each checkpoint increasing?

陈Darling
Thanks Rohrmann. Your answer inspired me.

CountWindow  defaults to using CountTrigger, but I set the trigger again.  

Parallelism is 1

.trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer


Through testing,I found that the data is generated much faster than tigger and countSize,slideSize is 300000,
DeltaTrigger threhold is 50. 
The size  of CountWindow is bigger than trigger size.

will it be caused by this reason?



Darling 
Andrew D.Lin



下面是被转发的邮件:

发件人: Till Rohrmann <[hidden email]>
主题: 回复: Why is the size of each checkpoint increasing?
日期: 2019年7月29日 GMT+8 下午8:58:44
收件人: 陈Darling <[hidden email]>

I think the size of the checkpoint strongly depends on the data you are feeding into this function. Depending on the actual values, it might be that you never fire the window. Please verify what carData actually returns.

Cheers,
Till

On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <[hidden email]> wrote:

Flink version is 1.81
The eaxmple is adapted according to TopSpeedWindowing
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
.assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
.keyBy(0)
.countWindow(countSize, slideSize)
.trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
private static final long serialVersionUID = 1L;

@Override
public double getDelta(
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
}, carData.getType().createSerializer(env.getConfig())))
.maxBy(1).setParallelism(parallelism);

The size of each checkpoint will increase from 100k to 100m.

Why is the size of each checkpoint increasing? 
In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}


Has anyone encountered a similar problem?





Darling 
Andrew D.Lin




Reply | Threaded
Open this post in threaded view
|

Fwd: Why is the size of each checkpoint increasing?

陈Darling
In reply to this post by Till Rohrmann
Thanks Rohrmann. Your answer inspired me.

CountWindow  defaults to using CountTrigger, but I set the trigger again.  

Parallelism is 1

.trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer


Through testing,I found that the data is generated much faster than tigger and countSize,slideSize is 300000,
DeltaTrigger threhold is 50. 
The size  of CountWindow is bigger than trigger size.

will it be caused by this reason?



Darling 
Andrew D.Lin



下面是被转发的邮件:

发件人: Till Rohrmann <[hidden email]>
主题: 回复: Why is the size of each checkpoint increasing?
日期: 2019年7月29日 GMT+8 下午8:58:44
收件人: 陈Darling <[hidden email]>

I think the size of the checkpoint strongly depends on the data you are feeding into this function. Depending on the actual values, it might be that you never fire the window. Please verify what carData actually returns.

Cheers,
Till

On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <[hidden email]> wrote:

Flink version is 1.81
The eaxmple is adapted according to TopSpeedWindowing
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
.assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
.keyBy(0)
.countWindow(countSize, slideSize)
.trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
private static final long serialVersionUID = 1L;

@Override
public double getDelta(
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
}, carData.getType().createSerializer(env.getConfig())))
.maxBy(1).setParallelism(parallelism);

The size of each checkpoint will increase from 100k to 100m.

Why is the size of each checkpoint increasing? 
In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}


Has anyone encountered a similar problem?





Darling 
Andrew D.Lin




Reply | Threaded
Open this post in threaded view
|

Re: Why is the size of each checkpoint increasing?

Congxian Qiu
In reply to this post by 陈Darling
Hi  Andrew
From Flink doc[1], there is "Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see Window Assigners). ", Seems the state of the fired window wouldn't be cleared for a count window, you can verify this to see each result of your trigger is what value of `TriggerResult`.


陈Darling <[hidden email]> 于2019年7月30日周二 下午7:20写道:
Thanks Rohrmann. Your answer inspired me.

CountWindow  defaults to using CountTrigger, but I set the trigger again.  

Parallelism is 1

.trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer


Through testing,I found that the data is generated much faster than tigger and countSize,slideSize is 300000,
DeltaTrigger threhold is 50. 
The size  of CountWindow is bigger than trigger size.

will it be caused by this reason?



Darling 
Andrew D.Lin



下面是被转发的邮件:

发件人: Till Rohrmann <[hidden email]>
主题: 回复: Why is the size of each checkpoint increasing?
日期: 2019年7月29日 GMT+8 下午8:58:44
收件人: 陈Darling <[hidden email]>

I think the size of the checkpoint strongly depends on the data you are feeding into this function. Depending on the actual values, it might be that you never fire the window. Please verify what carData actually returns.

Cheers,
Till

On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <[hidden email]> wrote:

Flink version is 1.81
The eaxmple is adapted according to TopSpeedWindowing
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
.assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
.keyBy(0)
.countWindow(countSize, slideSize)
.trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
private static final long serialVersionUID = 1L;

@Override
public double getDelta(
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
}, carData.getType().createSerializer(env.getConfig())))
.maxBy(1).setParallelism(parallelism);

The size of each checkpoint will increase from 100k to 100m.

Why is the size of each checkpoint increasing? 
In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}


Has anyone encountered a similar problem?





Darling 
Andrew D.Lin