Multiple stream operator watermark handling

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple stream operator watermark handling

Elias Levy
Is there mechanism for a multiple stream operator to ignore watermarks from one of the streams?

The use case is a multiple stream operator that consumes a primary stream and a secondary control stream.  The control stream may only receive messages in rare occasion, and possibly never.  The default behavior of the operator is to only emit the lowest of the last watermark received from each input stream.  That means that event time fails to advance if there are no control messages.  

I also notice that FLIP-17, the Side Input proposal, does not address this issue, either in the Wiki or in the Google Docs.

Assuming there is no currently prescribed way to handle this, are folks taking care of this by introducing a new Assigner after the multiple input operator to generate watermarks?


Reply | Threaded
Open this post in threaded view
|

Re: Multiple stream operator watermark handling

Piotr Nowojski
Hi,

From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

2. Can you set control stream’s watermark to Watermark#MAX_WATERMARK or maybe Watermark#MAX_WATERMARK - 1 ?

Piotrek

> On 24 May 2018, at 16:07, Elias Levy <[hidden email]> wrote:
>
> Is there mechanism for a multiple stream operator to ignore watermarks from one of the streams?
>
> The use case is a multiple stream operator that consumes a primary stream and a secondary control stream.  The control stream may only receive messages in rare occasion, and possibly never.  The default behavior of the operator is to only emit the lowest of the last watermark received from each input stream.  That means that event time fails to advance if there are no control messages.  
>
> I also notice that FLIP-17, the Side Input proposal, does not address this issue, either in the Wiki or in the Google Docs.
>
> Assuming there is no currently prescribed way to handle this, are folks taking care of this by introducing a new Assigner after the multiple input operator to generate watermarks?
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Multiple stream operator watermark handling

Elias Levy
On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <[hidden email]> wrote:
From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

That seems the safer, but more complicated path.


2. Can you set control stream’s watermark to Watermark#MAX_WATERMARK or maybe Watermark#MAX_WATERMARK - 1 ?

That seems simpler, put potentially perilous if at some point in the future there was some use to control stream watermarks.  Also, would it work if there are no messages in the control stream?  Wouldn't that mean no watermark would be emitted, even if they were hardcoded to Long.MAX_VALUE? In which case, the operator default for the stream would be used, which would still be Long.MIN_VALUE.


BTW, this reminds me of an issue I've mentioned previously, the documentation is lacking on a description of how watermarks are processed by operators.  E.g. when does a window emit watermarks?  what watermarks does it emit?  That seems like a rather large omission, as one of the main features of Flink is event time processing, which puts watermarks almost on equal footing to data and data operations.  Just as the docs describe how data is process, merged, etc, the same should be true for watermarks.


Reply | Threaded
Open this post in threaded view
|

Re: Multiple stream operator watermark handling

Elias Levy
On Thu, May 24, 2018 at 9:20 AM, Elias Levy <[hidden email]> wrote:
On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <[hidden email]> wrote:
From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

That seems the safer, but more complicated path.

As we had already implemented the business logic in a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:

class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {

  // Pass through the watermarks from the first stream
  override def processWatermark1(mark: Watermark): Unit = processWatermark(mark)

  // Ignore watermarks from the second stream
  override def processWatermark2(mark: Watermark): Unit = {}
}


Then it was easy to replace:

stream1
      .connect(stream2)
      .flatMap( new BusinessCoFlatMapFunction(params) )
        .name("Operator")
        .uid("op")

with:

stream1
      .connect(stream2)
      .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
      .uid("op")


Reply | Threaded
Open this post in threaded view
|

Re: Multiple stream operator watermark handling

Piotr Nowojski
Great to hear that this worked out for you :)

Progression of watermarks on an empty stream is a known issue, that we are working on to resolve in the future. Usually recommended workarounds are to send a custom blank event (which should be ignored) once a while.

I have expanded the documentation:
Please check it and If you have any further suggestions you are welcome to make a comments in the PR. I hope it clarifies the behaviour.

Piotrek

On 25 May 2018, at 00:03, Elias Levy <[hidden email]> wrote:

On Thu, May 24, 2018 at 9:20 AM, Elias Levy <[hidden email]> wrote:
On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <[hidden email]> wrote:
From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

That seems the safer, but more complicated path.

As we had already implemented the business logic in a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:

class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {

  // Pass through the watermarks from the first stream
  override def processWatermark1(mark: Watermark): Unit = processWatermark(mark)

  // Ignore watermarks from the second stream
  override def processWatermark2(mark: Watermark): Unit = {}
}


Then it was easy to replace:

stream1
      .connect(stream2)
      .flatMap( new BusinessCoFlatMapFunction(params) )
        .name("Operator")
        .uid("op")

with:

stream1
      .connect(stream2)
      .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
      .uid("op")