question regarding windowed stream

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

question regarding windowed stream

Balaji Rajagopalan
I have a requirement where I want to do aggregation on one data stream every 5 minutes, a different data stream every 1 minute. I wrote a example code to test this out but the behavior is different from what I expected , I expected the window2 to be called 5 times, and window 1 to called once , but in a 5 minute interval the window 1 is called once and window2 is called only once, have I understood the windowed function incorrectly, does the input play a role in no of times a window apply is called. I use the nc command to write to the socket port 9999 and 9998. 



import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}

import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.windowing.windows.Window


object WindowWordCount {
def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val text = env.socketTextStream("localhost", 9999)
val text1 = env.socketTextStream("localhost", 9998)
val stream:DataStream[String] = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
val count = stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5))).apply { new MyAllWindowFunction }


count.print

val counts1 = text1.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).apply { new MyAllWindowFunction2 }

counts1.print

env.execute("Window Stream WordCount")
}

class MyAllWindowFunction extends AllWindowFunction[String,String,TimeWindow]
{
def apply(window : TimeWindow, input : scala.Iterable[String], out : org.apache.flink.util.Collector[String]): Unit =
{
System.out.println("timed window1 is called")
}
}

class MyAllWindowFunction2 extends AllWindowFunction[String,String,TimeWindow]
{
def apply(window : TimeWindow, input : scala.Iterable[String], out : org.apache.flink.util.Collector[String]): Unit =
{
System.out.println("timed window2 is called")
}
}
}

The output was:
timed window2 is called
timed window1 is called
Reply | Threaded
Open this post in threaded view
|

Re: question regarding windowed stream

Aljoscha Krettek
Hi,
yes, the input does indeed play a role. If not elements are incoming then there will also be no window.

Cheers,
Aljoscha

On Fri, 6 May 2016 at 12:18 Balaji Rajagopalan <[hidden email]> wrote:
I have a requirement where I want to do aggregation on one data stream every 5 minutes, a different data stream every 1 minute. I wrote a example code to test this out but the behavior is different from what I expected , I expected the window2 to be called 5 times, and window 1 to called once , but in a 5 minute interval the window 1 is called once and window2 is called only once, have I understood the windowed function incorrectly, does the input play a role in no of times a window apply is called. I use the nc command to write to the socket port 9999 and 9998. 



import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}

import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.windowing.windows.Window


object WindowWordCount {
def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val text = env.socketTextStream("localhost", 9999)
val text1 = env.socketTextStream("localhost", 9998)
val stream:DataStream[String] = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
val count = stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5))).apply { new MyAllWindowFunction }


count.print

val counts1 = text1.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).apply { new MyAllWindowFunction2 }

counts1.print

env.execute("Window Stream WordCount")
}

class MyAllWindowFunction extends AllWindowFunction[String,String,TimeWindow]
{
def apply(window : TimeWindow, input : scala.Iterable[String], out : org.apache.flink.util.Collector[String]): Unit =
{
System.out.println("timed window1 is called")
}
}

class MyAllWindowFunction2 extends AllWindowFunction[String,String,TimeWindow]
{
def apply(window : TimeWindow, input : scala.Iterable[String], out : org.apache.flink.util.Collector[String]): Unit =
{
System.out.println("timed window2 is called")
}
}
}

The output was:
timed window2 is called
timed window1 is called