why same Sliding ProcessTime TimeWindow triggered twice

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

why same Sliding ProcessTime TimeWindow triggered twice

远远
hi,everyone:
today, i test Sliding ProcessTime TimeWindow with print some merties. i find a same sliding window be printed twice, as fllower:

now   ===> 2018-09-16 15:11:44
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
now   ===> 2018-09-16 15:11:45
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
 
but when i do some sum operator,it will not, i want to know why? 
thanks. 

my test code is:
object SlidingProcessTimeWindowTest {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource((context: SourceContext[String]) => {while(true) context.collect(new Random().nextInt(100) + ":FRI")})
.keyBy(s => s.endsWith("FRI"))
.timeWindow(Time.minutes(1), Time.seconds(5))
.apply((e, w, iter, coll: Collector[String]) => {
println("now ===> " + convert(DateTime.now().getMillis))
println("start ===> " + convert(w.getStart))
println("end ===> " + convert(w.getEnd))
println("max ===> " + convert(w.maxTimestamp()))
println(w)
// var reduce: Long = 0
// for(e <- iter){
// reduce += e.substring(0, e.length - 4).toInt
// }
// println("reduce ==> " + reduce)
coll.collect("aggreation")
}).setParallelism(1).print().setParallelism(1)

env.execute()
}

def convert(time: Long): String = {
new DateTime(time).toString("yyyy-MM-dd HH:mm:ss")
}
}
Reply | Threaded
Open this post in threaded view
|

Re: why same Sliding ProcessTime TimeWindow triggered twice

Xingcan Cui
Hi,

I’ve tested your code in my local environment and everything worked fine. It’s a little weird to see your output like that. I wonder if you could give more information about your environment, e.g., your flink version and execution settings.

Thanks,
Xingcan

On Sep 16, 2018, at 3:19 PM, 远远 <[hidden email]> wrote:

hi,everyone:
today, i test Sliding ProcessTime TimeWindow with print some merties. i find a same sliding window be printed twice, as fllower:

now   ===> 2018-09-16 15:11:44
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
now   ===> 2018-09-16 15:11:45
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
 
but when i do some sum operator,it will not, i want to know why? 
thanks. 

my test code is:
object SlidingProcessTimeWindowTest {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource((context: SourceContext[String]) => {while(true) context.collect(new Random().nextInt(100) + ":FRI")})
.keyBy(s => s.endsWith("FRI"))
.timeWindow(Time.minutes(1), Time.seconds(5))
.apply((e, w, iter, coll: Collector[String]) => {
println("now ===> " + convert(DateTime.now().getMillis))
println("start ===> " + convert(w.getStart))
println("end ===> " + convert(w.getEnd))
println("max ===> " + convert(w.maxTimestamp()))
println(w)
// var reduce: Long = 0
// for(e <- iter){
// reduce += e.substring(0, e.length - 4).toInt
// }
// println("reduce ==> " + reduce)
coll.collect("aggreation")
}).setParallelism(1).print().setParallelism(1)

env.execute()
}

def convert(time: Long): String = {
new DateTime(time).toString("yyyy-MM-dd HH:mm:ss")
}
}

Reply | Threaded
Open this post in threaded view
|

Fwd: why same Sliding ProcessTime TimeWindow triggered twice

远远


---------- Forwarded message ---------
From: 远远 <[hidden email]>
Date: 2018年9月16日周日 下午4:08
Subject: Re: why same Sliding ProcessTime TimeWindow triggered twice
To: <[hidden email]>


hi, the flink version that i test  is 1.4.2, and i just run test code with local env in IDEA, and all the setting in the test code.
my os is deepin(linux based debian) 15.7...

and i try again, the print as flow:
now   ===> 2018-09-16 16:06:09
start ===> 2018-09-16 16:05:10
end   ===> 2018-09-16 16:06:10
max   ===> 2018-09-16 16:06:09
TimeWindow{start=1537085110000, end=1537085170000}
aggreation
now   ===> 2018-09-16 16:06:09
start ===> 2018-09-16 16:05:10
end   ===> 2018-09-16 16:06:10
max   ===> 2018-09-16 16:06:09
TimeWindow{start=1537085110000, end=1537085170000}
aggreation
now   ===> 2018-09-16 16:06:16
start ===> 2018-09-16 16:05:15
end   ===> 2018-09-16 16:06:15
max   ===> 2018-09-16 16:06:14
TimeWindow{start=1537085115000, end=1537085175000}
aggreation
now   ===> 2018-09-16 16:06:19
start ===> 2018-09-16 16:05:20
end   ===> 2018-09-16 16:06:20
max   ===> 2018-09-16 16:06:19
TimeWindow{start=1537085120000, end=1537085180000}
aggreation
now   ===> 2018-09-16 16:06:20
start ===> 2018-09-16 16:05:20
end   ===> 2018-09-16 16:06:20
max   ===> 2018-09-16 16:06:19
TimeWindow{start=1537085120000, end=1537085180000}
aggreation
now   ===> 2018-09-16 16:06:24
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:24
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:25
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:29
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=1537085130000, end=1537085190000}
aggreation
now   ===> 2018-09-16 16:06:29
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=1537085130000, end=1537085190000}
aggreation
now   ===> 2018-09-16 16:06:30
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=1537085130000, end=1537085190000}
aggreation
now   ===> 2018-09-16 16:06:36
start ===> 2018-09-16 16:05:35
end   ===> 2018-09-16 16:06:35
max   ===> 2018-09-16 16:06:34
TimeWindow{start=1537085135000, end=1537085195000}


Xingcan Cui <[hidden email]> 于2018年9月16日周日 下午3:55写道:
Hi,

I’ve tested your code in my local environment and everything worked fine. It’s a little weird to see your output like that. I wonder if you could give more information about your environment, e.g., your flink version and execution settings.

Thanks,
Xingcan

On Sep 16, 2018, at 3:19 PM, 远远 <[hidden email]> wrote:

hi,everyone:
today, i test Sliding ProcessTime TimeWindow with print some merties. i find a same sliding window be printed twice, as fllower:

now   ===> 2018-09-16 15:11:44
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
now   ===> 2018-09-16 15:11:45
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
 
but when i do some sum operator,it will not, i want to know why? 
thanks. 

my test code is:
object SlidingProcessTimeWindowTest {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource((context: SourceContext[String]) => {while(true) context.collect(new Random().nextInt(100) + ":FRI")})
.keyBy(s => s.endsWith("FRI"))
.timeWindow(Time.minutes(1), Time.seconds(5))
.apply((e, w, iter, coll: Collector[String]) => {
println("now ===> " + convert(DateTime.now().getMillis))
println("start ===> " + convert(w.getStart))
println("end ===> " + convert(w.getEnd))
println("max ===> " + convert(w.maxTimestamp()))
println(w)
// var reduce: Long = 0
// for(e <- iter){
// reduce += e.substring(0, e.length - 4).toInt
// }
// println("reduce ==> " + reduce)
coll.collect("aggreation")
}).setParallelism(1).print().setParallelism(1)

env.execute()
}

def convert(time: Long): String = {
new DateTime(time).toString("yyyy-MM-dd HH:mm:ss")
}
}

Reply | Threaded
Open this post in threaded view
|

Re: why same Sliding ProcessTime TimeWindow triggered twice

Rong Rong
I haven't dug too deep into the content. But seems like this line was the reason:
                .keyBy(s => s.endsWith("FRI"))
essentially you are creating two key partitions (True, False) where each one of them has its own sliding window I believe. Can you printout the key space for each of the window?

--
Rong

On Sun, Sep 16, 2018 at 1:23 AM 远远 <[hidden email]> wrote:


---------- Forwarded message ---------
From: 远远 <[hidden email]>
Date: 2018年9月16日周日 下午4:08
Subject: Re: why same Sliding ProcessTime TimeWindow triggered twice
To: <[hidden email]>


hi, the flink version that i test  is 1.4.2, and i just run test code with local env in IDEA, and all the setting in the test code.
my os is deepin(linux based debian) 15.7...

and i try again, the print as flow:
now   ===> 2018-09-16 16:06:09
start ===> 2018-09-16 16:05:10
end   ===> 2018-09-16 16:06:10
max   ===> 2018-09-16 16:06:09
TimeWindow{start=1537085110000, end=1537085170000}
aggreation
now   ===> 2018-09-16 16:06:09
start ===> 2018-09-16 16:05:10
end   ===> 2018-09-16 16:06:10
max   ===> 2018-09-16 16:06:09
TimeWindow{start=1537085110000, end=1537085170000}
aggreation
now   ===> 2018-09-16 16:06:16
start ===> 2018-09-16 16:05:15
end   ===> 2018-09-16 16:06:15
max   ===> 2018-09-16 16:06:14
TimeWindow{start=1537085115000, end=1537085175000}
aggreation
now   ===> 2018-09-16 16:06:19
start ===> 2018-09-16 16:05:20
end   ===> 2018-09-16 16:06:20
max   ===> 2018-09-16 16:06:19
TimeWindow{start=1537085120000, end=1537085180000}
aggreation
now   ===> 2018-09-16 16:06:20
start ===> 2018-09-16 16:05:20
end   ===> 2018-09-16 16:06:20
max   ===> 2018-09-16 16:06:19
TimeWindow{start=1537085120000, end=1537085180000}
aggreation
now   ===> 2018-09-16 16:06:24
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:24
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:25
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:29
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=1537085130000, end=1537085190000}
aggreation
now   ===> 2018-09-16 16:06:29
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=1537085130000, end=1537085190000}
aggreation
now   ===> 2018-09-16 16:06:30
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=1537085130000, end=1537085190000}
aggreation
now   ===> 2018-09-16 16:06:36
start ===> 2018-09-16 16:05:35
end   ===> 2018-09-16 16:06:35
max   ===> 2018-09-16 16:06:34
TimeWindow{start=1537085135000, end=1537085195000}


Xingcan Cui <[hidden email]> 于2018年9月16日周日 下午3:55写道:
Hi,

I’ve tested your code in my local environment and everything worked fine. It’s a little weird to see your output like that. I wonder if you could give more information about your environment, e.g., your flink version and execution settings.

Thanks,
Xingcan

On Sep 16, 2018, at 3:19 PM, 远远 <[hidden email]> wrote:

hi,everyone:
today, i test Sliding ProcessTime TimeWindow with print some merties. i find a same sliding window be printed twice, as fllower:

now   ===> 2018-09-16 15:11:44
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
now   ===> 2018-09-16 15:11:45
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
 
but when i do some sum operator,it will not, i want to know why? 
thanks. 

my test code is:
object SlidingProcessTimeWindowTest {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource((context: SourceContext[String]) => {while(true) context.collect(new Random().nextInt(100) + ":FRI")})
.keyBy(s => s.endsWith("FRI"))
.timeWindow(Time.minutes(1), Time.seconds(5))
.apply((e, w, iter, coll: Collector[String]) => {
println("now ===> " + convert(DateTime.now().getMillis))
println("start ===> " + convert(w.getStart))
println("end ===> " + convert(w.getEnd))
println("max ===> " + convert(w.maxTimestamp()))
println(w)
// var reduce: Long = 0
// for(e <- iter){
// reduce += e.substring(0, e.length - 4).toInt
// }
// println("reduce ==> " + reduce)
coll.collect("aggreation")
}).setParallelism(1).print().setParallelism(1)

env.execute()
}

def convert(time: Long): String = {
new DateTime(time).toString("yyyy-MM-dd HH:mm:ss")
}
}