Confused window operation

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Confused window operation

Jeff Zhang
Hi all,

I am a little confused with the following windows operation. Here's the code,

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
val data = senv.fromElements("hello world", "hello flink", "hello hadoop")

data.flatMap(line => line.split("\\s"))
.map(w => (w, 1))
.keyBy(0)
.countWindow(2, 1)
.sum(1)
.print("******************")

senv.execute()

And this is the output:
******************> (hello,1)
******************> (world,1)
******************> (hello,2)
******************> (flink,1)
******************> (hello,2)
******************> (hadoop,1)

As my understanding, here we have 3 windows.
window 1
(hello, world)
window 2
(hello, world)
(hello, flink)
window 3
(hello flink)
(hello hadoop)
So for the first window, we have output (hello, 1) (world, 1)
for the second window we should output (hello, 2), (world,1 ), (flink, 1)
for the third window we should have output (hello, 2), (flink, 1), (hadoop, 1)

But as you can see, in the above I get different result, do I misunderstand the window ? Could anyone help me to understand that ? Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Confused window operation

Hequn Cheng
Hi Jeff,

The window is not a global window. It is related to a specified key. You would have 6 windows after flatMap() and keyBy().
key: hello with 3 windows
key: world with 1 window
key: flink with 1 window
key: hadoop with 1 window

Best, Hequn


On Wed, Nov 14, 2018 at 10:31 AM Jeff Zhang <[hidden email]> wrote:
Hi all,

I am a little confused with the following windows operation. Here's the code,

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
val data = senv.fromElements("hello world", "hello flink", "hello hadoop")

data.flatMap(line => line.split("\\s"))
.map(w => (w, 1))
.keyBy(0)
.countWindow(2, 1)
.sum(1)
.print("******************")

senv.execute()

And this is the output:
******************> (hello,1)
******************> (world,1)
******************> (hello,2)
******************> (flink,1)
******************> (hello,2)
******************> (hadoop,1)

As my understanding, here we have 3 windows.
window 1
(hello, world)
window 2
(hello, world)
(hello, flink)
window 3
(hello flink)
(hello hadoop)
So for the first window, we have output (hello, 1) (world, 1)
for the second window we should output (hello, 2), (world,1 ), (flink, 1)
for the third window we should have output (hello, 2), (flink, 1), (hadoop, 1)

But as you can see, in the above I get different result, do I misunderstand the window ? Could anyone help me to understand that ? Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Confused window operation

Jeff Zhang
Thanks hequn & [hidden email]

On Wed, Nov 14, 2018 at 2:17 PM Hequn Cheng <[hidden email]> wrote:
Hi Jeff,

The window is not a global window. It is related to a specified key. You would have 6 windows after flatMap() and keyBy().
key: hello with 3 windows
key: world with 1 window
key: flink with 1 window
key: hadoop with 1 window

Best, Hequn


On Wed, Nov 14, 2018 at 10:31 AM Jeff Zhang <[hidden email]> wrote:
Hi all,

I am a little confused with the following windows operation. Here's the code,

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
val data = senv.fromElements("hello world", "hello flink", "hello hadoop")

data.flatMap(line => line.split("\\s"))
.map(w => (w, 1))
.keyBy(0)
.countWindow(2, 1)
.sum(1)
.print("******************")

senv.execute()

And this is the output:
******************> (hello,1)
******************> (world,1)
******************> (hello,2)
******************> (flink,1)
******************> (hello,2)
******************> (hadoop,1)

As my understanding, here we have 3 windows.
window 1
(hello, world)
window 2
(hello, world)
(hello, flink)
window 3
(hello flink)
(hello hadoop)
So for the first window, we have output (hello, 1) (world, 1)
for the second window we should output (hello, 2), (world,1 ), (flink, 1)
for the third window we should have output (hello, 2), (flink, 1), (hadoop, 1)

But as you can see, in the above I get different result, do I misunderstand the window ? Could anyone help me to understand that ? Thanks


--
Best Regards

Jeff Zhang