DataStream API min max aggregation on other fields

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

DataStream API min max aggregation on other fields

Lu Weizheng
Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. It means other fields will be kept as the max or min element. This is quite clear. However, when I use max or min, how do Flink do on other fields?

val tupleStream = senv.fromElements(
(0, 0, 0), (0, 1, 1), (0, 2, 2),
(1, 0, 6), (1, 1, 7), (1, 2, 8)
)
// (0,0,0)
// (0,0,1)
// (0,0,2)
// (1,0,6)
// (1,0,7)
// (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()
In this case, the second field use the first element's 0.

class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

var isRunning: Boolean = true
var i = 0

val rand = new Random()

override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

while (isRunning) {

// 将数据源收集写入SourceContext
srcCtx.collect((0, i, i))
i += 1
Thread.sleep(1000)
}
}

override def cancel(): Unit = {
isRunning = false
}
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
.keyBy(0)
.timeWindow(Time.milliseconds(2000))
.max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other fields ?

Thank you so much if anyone can replay.

Weizheng
Reply | Threaded
Open this post in threaded view
|

Re: DataStream API min max aggregation on other fields

vino yang
Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct? Can you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the other field based on the position you specified.

Best,
Vino

Lu Weizheng <[hidden email]> 于2019年12月19日周四 下午5:00写道:
Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. It means other fields will be kept as the max or min element. This is quite clear. However, when I use max or min, how do Flink do on other fields?

val tupleStream = senv.fromElements(
(0, 0, 0), (0, 1, 1), (0, 2, 2),
(1, 0, 6), (1, 1, 7), (1, 2, 8)
)
// (0,0,0)
// (0,0,1)
// (0,0,2)
// (1,0,6)
// (1,0,7)
// (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()
In this case, the second field use the first element's 0.

class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

var isRunning: Boolean = true
var i = 0

val rand = new Random()

override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

while (isRunning) {

// 将数据源收集写入SourceContext
srcCtx.collect((0, i, i))
i += 1
Thread.sleep(1000)
}
}

override def cancel(): Unit = {
isRunning = false
}
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
.keyBy(0)
.timeWindow(Time.milliseconds(2000))
.max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other fields ?

Thank you so much if anyone can replay.

Weizheng
Reply | Threaded
Open this post in threaded view
|

Re: DataStream API min max aggregation on other fields

Biao Liu
Hi Lu,

[hidden email] I think what he means is that the "max" semantics between window and non-window are different. It changes non-aggregated fields unpredictably.

That's really an interesting question.

I take a look at the relevant implementation. From the perspective of codes, "max" always keeps the non-aggregated fields with the value of first arrived record, which should be (0, 0, x) in this case. However when the window is purged, the state (which keeps non-aggregated fields of first arrived record and the maximum field) will be cleared. That means the "first arrived record" will always be reset when a window is purged. That's why the second column increases unpredictably.

The semantics here is so confused to me.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 17:50, vino yang <[hidden email]> wrote:
Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct? Can you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the other field based on the position you specified.

Best,
Vino

Lu Weizheng <[hidden email]> 于2019年12月19日周四 下午5:00写道:
Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. It means other fields will be kept as the max or min element. This is quite clear. However, when I use max or min, how do Flink do on other fields?

val tupleStream = senv.fromElements(
(0, 0, 0), (0, 1, 1), (0, 2, 2),
(1, 0, 6), (1, 1, 7), (1, 2, 8)
)
// (0,0,0)
// (0,0,1)
// (0,0,2)
// (1,0,6)
// (1,0,7)
// (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()
In this case, the second field use the first element's 0.

class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

var isRunning: Boolean = true
var i = 0

val rand = new Random()

override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

while (isRunning) {

// 将数据源收集写入SourceContext
srcCtx.collect((0, i, i))
i += 1
Thread.sleep(1000)
}
}

override def cancel(): Unit = {
isRunning = false
}
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
.keyBy(0)
.timeWindow(Time.milliseconds(2000))
.max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other fields ?

Thank you so much if anyone can replay.

Weizheng
Reply | Threaded
Open this post in threaded view
|

回复: DataStream API min max aggregation on other fields

Lu Weizheng
Yes, the unpredictable non-key and non-aggregated fields make me confused. As Biao said, It is because the purged window state.
So when I want to use max or min, I should only use aggregated field. Other fields are not defined, I should take care not use them.

Thank you guys for your replies!

发件人: Biao Liu <[hidden email]>
发送时间: 2019年12月19日 18:10
收件人: vino yang <[hidden email]>
抄送: Lu Weizheng <[hidden email]>; [hidden email] <[hidden email]>
主题: Re: DataStream API min max aggregation on other fields
 
Hi Lu,

[hidden email] I think what he means is that the "max" semantics between window and non-window are different. It changes non-aggregated fields unpredictably.

That's really an interesting question.

I take a look at the relevant implementation. From the perspective of codes, "max" always keeps the non-aggregated fields with the value of first arrived record, which should be (0, 0, x) in this case. However when the window is purged, the state (which keeps non-aggregated fields of first arrived record and the maximum field) will be cleared. That means the "first arrived record" will always be reset when a window is purged. That's why the second column increases unpredictably.

The semantics here is so confused to me.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 17:50, vino yang <[hidden email]> wrote:
Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct? Can you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the other field based on the position you specified.

Best,
Vino

Lu Weizheng <[hidden email]> 于2019年12月19日周四 下午5:00写道:
Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. It means other fields will be kept as the max or min element. This is quite clear. However, when I use max or min, how do Flink do on other fields?

val tupleStream = senv.fromElements(
(0, 0, 0), (0, 1, 1), (0, 2, 2),
(1, 0, 6), (1, 1, 7), (1, 2, 8)
)
// (0,0,0)
// (0,0,1)
// (0,0,2)
// (1,0,6)
// (1,0,7)
// (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()
In this case, the second field use the first element's 0.

class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

var isRunning: Boolean = true
var i = 0

val rand = new Random()

override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

while (isRunning) {

// 将数据源收集写入SourceContext
srcCtx.collect((0, i, i))
i += 1
Thread.sleep(1000)
}
}

override def cancel(): Unit = {
isRunning = false
}
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
.keyBy(0)
.timeWindow(Time.milliseconds(2000))
.max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other fields ?

Thank you so much if anyone can replay.

Weizheng