Window start and end issue with TumblingProcessingTimeWindows

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Window start and end issue with TumblingProcessingTimeWindows

Soumya Simanta
I've a simple program which takes some inputs from a command line (Socket stream) and then aggregates based on the key. 

When running this program on my local machine I see some output that is counter intuitive to my understanding of windows in Flink. 

The start time of the Window is around the time the Functions are being evaluated. However, the window end time is around 60 s (window size) after the current time (please see below).  

Can someone explain this behaviour please? 
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key: String, value: Int)

object Processor {

val window_length = 60000 // milliseconds

def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
var sum = 0
for (e <- in) {
sum = sum + e.value
}
val start = window.getStart
val end = window.getEnd
val diff = (end - start)
println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")


out.collect(
new EventAgg(
start = window.getStart,
end = window.getEnd,
key = key,
value = sum
)
)
}

def main(Args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val sevents = env.socketTextStream("localhost", 9000)
sevents
.map(x => parseEvent(x))
.keyBy(_.key)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
.apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
.map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
.print()

env.execute("Event time windows")
}

def parseEvent(s: String): Event = {
if (s == null || s.trim().length == 0)
Event("default", 0, 0L)
else {
val parts = s.split(",")
Event(parts(0), parts(1).toInt, 1L)
}
}
}

Output

 windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end: 1465234260000 diff: 60000
 windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end: 1465234260000 diff: 60000
3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3)
7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)


Reply | Threaded
Open this post in threaded view
|

Re: Window start and end issue with TumblingProcessingTimeWindows

Chesnay Schepler
could you state a specific problem?

On 07.06.2016 06:40, Soumya Simanta wrote:
I've a simple program which takes some inputs from a command line (Socket stream) and then aggregates based on the key. 

When running this program on my local machine I see some output that is counter intuitive to my understanding of windows in Flink. 

The start time of the Window is around the time the Functions are being evaluated. However, the window end time is around 60 s (window size) after the current time (please see below).  

Can someone explain this behaviour please? 
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key: String, value: Int)

object Processor {

  val window_length = 60000 // milliseconds

  def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
    var sum = 0
    for (e <- in) {
      sum = sum + e.value
    }
    val start = window.getStart
    val end = window.getEnd
    val diff = (end - start)
    println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")


    out.collect(
      new EventAgg(
        start = window.getStart,
        end = window.getEnd,
        key = key,
        value = sum
      )
    )
  }

  def main(Args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val sevents = env.socketTextStream("localhost", 9000)
    sevents
      .map(x => parseEvent(x))
      .keyBy(_.key)
      .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
      .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
      .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
      .print()

    env.execute("Event time windows")
  }

  def parseEvent(s: String): Event = {
    if (s == null || s.trim().length == 0)
      Event("default", 0, 0L)
    else {
      val parts = s.split(",")
      Event(parts(0), parts(1).toInt, 1L)
    }
  }
}

Output

 windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end: 1465234260000 diff: 60000
 windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end: 1465234260000 diff: 60000
3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3)
7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)



Reply | Threaded
Open this post in threaded view
|

Re: Window start and end issue with TumblingProcessingTimeWindows

Soumya Simanta
The problem is why is the window end time in the future ? 

For example if my window size is 60 seconds and my window is being evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00 pm even when the data that is being evaluated falls in the window 2.59 - 3.00. 

Sent from my iPhone

On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <[hidden email]> wrote:

could you state a specific problem?

On 07.06.2016 06:40, Soumya Simanta wrote:
I've a simple program which takes some inputs from a command line (Socket stream) and then aggregates based on the key. 

When running this program on my local machine I see some output that is counter intuitive to my understanding of windows in Flink. 

The start time of the Window is around the time the Functions are being evaluated. However, the window end time is around 60 s (window size) after the current time (please see below).  

Can someone explain this behaviour please? 
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key: String, value: Int)

object Processor {

  val window_length = 60000 // milliseconds

  def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
    var sum = 0
    for (e <- in) {
      sum = sum + e.value
    }
    val start = window.getStart
    val end = window.getEnd
    val diff = (end - start)
    println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")


    out.collect(
      new EventAgg(
        start = window.getStart,
        end = window.getEnd,
        key = key,
        value = sum
      )
    )
  }

  def main(Args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val sevents = env.socketTextStream("localhost", 9000)
    sevents
      .map(x => parseEvent(x))
      .keyBy(_.key)
      .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
      .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
      .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
      .print()

    env.execute("Event time windows")
  }

  def parseEvent(s: String): Event = {
    if (s == null || s.trim().length == 0)
      Event("default", 0, 0L)
    else {
      val parts = s.split(",")
      Event(parts(0), parts(1).toInt, 1L)
    }
  }
}

Output

 windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end: 1465234260000 diff: 60000
 windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end: 1465234260000 diff: 60000
3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3)
7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)



Reply | Threaded
Open this post in threaded view
|

Re: Window start and end issue with TumblingProcessingTimeWindows

Aljoscha Krettek
Hi,
I'm afraid you're running into a bug into the special processing-time window operator. A suggested workaround would be to switch to characteristic IngestionTime and use TumblingEventTimeWindows.

I also open a Jira issue for the bug so that we can keep track of it: https://issues.apache.org/jira/browse/FLINK-4028

Cheers,
Aljoscha

On Tue, 7 Jun 2016 at 14:57 Soumya Simanta <[hidden email]> wrote:
The problem is why is the window end time in the future ? 

For example if my window size is 60 seconds and my window is being evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00 pm even when the data that is being evaluated falls in the window 2.59 - 3.00. 

Sent from my iPhone

On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <[hidden email]> wrote:

could you state a specific problem?

On 07.06.2016 06:40, Soumya Simanta wrote:
I've a simple program which takes some inputs from a command line (Socket stream) and then aggregates based on the key. 

When running this program on my local machine I see some output that is counter intuitive to my understanding of windows in Flink. 

The start time of the Window is around the time the Functions are being evaluated. However, the window end time is around 60 s (window size) after the current time (please see below).  

Can someone explain this behaviour please? 
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key: String, value: Int)

object Processor {

  val window_length = 60000 // milliseconds

  def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
    var sum = 0
    for (e <- in) {
      sum = sum + e.value
    }
    val start = window.getStart
    val end = window.getEnd
    val diff = (end - start)
    println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")


    out.collect(
      new EventAgg(
        start = window.getStart,
        end = window.getEnd,
        key = key,
        value = sum
      )
    )
  }

  def main(Args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val sevents = env.socketTextStream("localhost", 9000)
    sevents
      .map(x => parseEvent(x))
      .keyBy(_.key)
      .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
      .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
      .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
      .print()

    env.execute("Event time windows")
  }

  def parseEvent(s: String): Event = {
    if (s == null || s.trim().length == 0)
      Event("default", 0, 0L)
    else {
      val parts = s.split(",")
      Event(parts(0), parts(1).toInt, 1L)
    }
  }
}

Output

 windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end: 1465234260000 diff: 60000
 windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end: 1465234260000 diff: 60000
3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3)
7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)



Reply | Threaded
Open this post in threaded view
|

Re: Window start and end issue with TumblingProcessingTimeWindows

Soumya Simanta
Thanks for the clarification. 

On Tue, Jun 7, 2016 at 9:15 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I'm afraid you're running into a bug into the special processing-time window operator. A suggested workaround would be to switch to characteristic IngestionTime and use TumblingEventTimeWindows.

I also open a Jira issue for the bug so that we can keep track of it: https://issues.apache.org/jira/browse/FLINK-4028

Cheers,
Aljoscha

On Tue, 7 Jun 2016 at 14:57 Soumya Simanta <[hidden email]> wrote:
The problem is why is the window end time in the future ? 

For example if my window size is 60 seconds and my window is being evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00 pm even when the data that is being evaluated falls in the window 2.59 - 3.00. 

Sent from my iPhone

On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <[hidden email]> wrote:

could you state a specific problem?

On 07.06.2016 06:40, Soumya Simanta wrote:
I've a simple program which takes some inputs from a command line (Socket stream) and then aggregates based on the key. 

When running this program on my local machine I see some output that is counter intuitive to my understanding of windows in Flink. 

The start time of the Window is around the time the Functions are being evaluated. However, the window end time is around 60 s (window size) after the current time (please see below).  

Can someone explain this behaviour please? 
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key: String, value: Int)

object Processor {

  val window_length = 60000 // milliseconds

  def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
    var sum = 0
    for (e <- in) {
      sum = sum + e.value
    }
    val start = window.getStart
    val end = window.getEnd
    val diff = (end - start)
    println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")


    out.collect(
      new EventAgg(
        start = window.getStart,
        end = window.getEnd,
        key = key,
        value = sum
      )
    )
  }

  def main(Args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val sevents = env.socketTextStream("localhost", 9000)
    sevents
      .map(x => parseEvent(x))
      .keyBy(_.key)
      .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
      .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
      .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
      .print()

    env.execute("Event time windows")
  }

  def parseEvent(s: String): Event = {
    if (s == null || s.trim().length == 0)
      Event("default", 0, 0L)
    else {
      val parts = s.split(",")
      Event(parts(0), parts(1).toInt, 1L)
    }
  }
}

Output

 windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end: 1465234260000 diff: 60000
 windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end: 1465234260000 diff: 60000
3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3)
7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)