Why tuples are not ignored after watermark?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Why tuples are not ignored after watermark?

Saiph Kappa
Hi,

I have a streaming (event time) application where I am receiving events with the same assigned timestamp. I receive 10000 events in total on a window of 5 minutes, but I emit water mark when 9000 elements have been received. This watermark is 6 minutes after the assigned timestamps. My question is: why the function that is associated with the window reads 10000 elements and not 9000? All elements that have a timestamp lower than the watermark should be ignored (1000), but it's not happening.

Here is part of the code:
«
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rawStream = env.socketTextStream("localhost", 4321)

val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, Int, Long)] {
      val timestamp = System.currentTimeMillis();

      override def extractTimestamp(element: (String, Int, Long), previousElementTimestamp: Long): Long =
        timestamp

      override def checkAndGetNextWatermark(lastElement: (String, Int, Long), extractedTimestamp: Long): Watermark = {
        if(lastElement._3 == 9000) {
          val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
          new watermark.Watermark(ts)
        } else null
      }
    }

val stream = rawStream.map(line => {
      val Array(p1, p2, p3) = line.split(" ")
      (p1, p2.toInt, p3.toLong)
    })
      .assignTimestampsAndWatermarks(punctuatedAssigner)

stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
»

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Why tuples are not ignored after watermark?

Aljoscha Krettek
Hi,
the problem might be that your timestamp/watermark assigner is run in parallel and that only one parallel instance of those operators emits the watermark because only one of those parallel instances sees the element with _3 == 9000. For the watermark to advance at an operator it needs to advance in all upstream operations.

Cheers,
Aljoscha

On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <[hidden email]> wrote:
Hi,

I have a streaming (event time) application where I am receiving events with the same assigned timestamp. I receive 10000 events in total on a window of 5 minutes, but I emit water mark when 9000 elements have been received. This watermark is 6 minutes after the assigned timestamps. My question is: why the function that is associated with the window reads 10000 elements and not 9000? All elements that have a timestamp lower than the watermark should be ignored (1000), but it's not happening.

Here is part of the code:
«
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rawStream = env.socketTextStream("localhost", 4321)

val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, Int, Long)] {
      val timestamp = System.currentTimeMillis();

      override def extractTimestamp(element: (String, Int, Long), previousElementTimestamp: Long): Long =
        timestamp

      override def checkAndGetNextWatermark(lastElement: (String, Int, Long), extractedTimestamp: Long): Watermark = {
        if(lastElement._3 == 9000) {
          val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
          new watermark.Watermark(ts)
        } else null
      }
    }

val stream = rawStream.map(line => {
      val Array(p1, p2, p3) = line.split(" ")
      (p1, p2.toInt, p3.toLong)
    })
      .assignTimestampsAndWatermarks(punctuatedAssigner)

stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
»

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Why tuples are not ignored after watermark?

Saiph Kappa
And is it possible to share state across parallel instances with AssignerWithPunctuatedWatermarks?

Thanks!

On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the problem might be that your timestamp/watermark assigner is run in parallel and that only one parallel instance of those operators emits the watermark because only one of those parallel instances sees the element with _3 == 9000. For the watermark to advance at an operator it needs to advance in all upstream operations.

Cheers,
Aljoscha

On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <[hidden email]> wrote:
Hi,

I have a streaming (event time) application where I am receiving events with the same assigned timestamp. I receive 10000 events in total on a window of 5 minutes, but I emit water mark when 9000 elements have been received. This watermark is 6 minutes after the assigned timestamps. My question is: why the function that is associated with the window reads 10000 elements and not 9000? All elements that have a timestamp lower than the watermark should be ignored (1000), but it's not happening.

Here is part of the code:
«
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rawStream = env.socketTextStream("localhost", 4321)

val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, Int, Long)] {
      val timestamp = System.currentTimeMillis();

      override def extractTimestamp(element: (String, Int, Long), previousElementTimestamp: Long): Long =
        timestamp

      override def checkAndGetNextWatermark(lastElement: (String, Int, Long), extractedTimestamp: Long): Watermark = {
        if(lastElement._3 == 9000) {
          val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
          new watermark.Watermark(ts)
        } else null
      }
    }

val stream = rawStream.map(line => {
      val Array(p1, p2, p3) = line.split(" ")
      (p1, p2.toInt, p3.toLong)
    })
      .assignTimestampsAndWatermarks(punctuatedAssigner)

stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
»

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Why tuples are not ignored after watermark?

Fabian Hueske-2
No, this is not possible unless you use an external service such as a database.
The assigners might run on different machines and Flink does not provide utilities for r/w shared state.

Best, Fabian

2016-09-15 20:17 GMT+02:00 Saiph Kappa <[hidden email]>:
And is it possible to share state across parallel instances with AssignerWithPunctuatedWatermarks?

Thanks!

On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the problem might be that your timestamp/watermark assigner is run in parallel and that only one parallel instance of those operators emits the watermark because only one of those parallel instances sees the element with _3 == 9000. For the watermark to advance at an operator it needs to advance in all upstream operations.

Cheers,
Aljoscha

On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <[hidden email]> wrote:
Hi,

I have a streaming (event time) application where I am receiving events with the same assigned timestamp. I receive 10000 events in total on a window of 5 minutes, but I emit water mark when 9000 elements have been received. This watermark is 6 minutes after the assigned timestamps. My question is: why the function that is associated with the window reads 10000 elements and not 9000? All elements that have a timestamp lower than the watermark should be ignored (1000), but it's not happening.

Here is part of the code:
«
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rawStream = env.socketTextStream("localhost", 4321)

val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, Int, Long)] {
      val timestamp = System.currentTimeMillis();

      override def extractTimestamp(element: (String, Int, Long), previousElementTimestamp: Long): Long =
        timestamp

      override def checkAndGetNextWatermark(lastElement: (String, Int, Long), extractedTimestamp: Long): Watermark = {
        if(lastElement._3 == 9000) {
          val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
          new watermark.Watermark(ts)
        } else null
      }
    }

val stream = rawStream.map(line => {
      val Array(p1, p2, p3) = line.split(" ")
      (p1, p2.toInt, p3.toLong)
    })
      .assignTimestampsAndWatermarks(punctuatedAssigner)

stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
»

Thanks!