Idle stream does not advance watermark in connected stream

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Idle stream does not advance watermark in connected stream

Kien Truong
Hi all,
We are testing the new Idleness detection feature in Flink 1.11, however, it does not work as we expected:
When we connect two data streams, of which one is idle, the output watermark CoProcessOperator does not increase, hence the program cannot progress.

I've made a small project to illustrate the problem. The watermark received by the sink does not increase at all until the idle source is stopped.


Is this a bug or does the idleness detection not support this use case ?

Regards.
Kien
Reply | Threaded
Open this post in threaded view
|

Re: Idle stream does not advance watermark in connected stream

Dawid Wysakowicz-2
Hi Kien,

I am afraid this is a valid bug. I am not 100% sure but the way I
understand the code the idleness mechanism applies to input channels,
which means e.g. when multiple parallell instances shuffle its results
to downstream operators.

In case of a two input operator, combining the watermark of two
different upstream operators happens inside of the operator itself.
There we do not have the idleness status. We do not have a status that a
whole upstream operator became idle. That's definitely a bug/limitation.

I'm also cc'ing Aljoscha who could maybe confirm my analysis.

Best,

Dawid

On 24/08/2020 16:00, Truong Duc Kien wrote:

> Hi all,
> We are testing the new Idleness detection feature in Flink 1.11,
> however, it does not work as we expected:
> When we connect two data streams, of which one is idle, the output
> watermark CoProcessOperator does not increase, hence the program
> cannot progress.
>
> I've made a small project to illustrate the problem. The watermark
> received by the sink does not increase at all until the idle source is
> stopped.
>
> https://github.com/kien-truong/flink-idleness-testing
>
> Is this a bug or does the idleness detection not support this use case ?
>
> Regards.
> Kien


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Idle stream does not advance watermark in connected stream

Aljoscha Krettek
Yes, I'm afraid this analysis is correct. The StreamOperator,
AbstractStreamOperator to be specific, computes the combined watermarks
from both inputs here:
https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
The operator layer is not aware of idleness so it will never notice. The
idleness only works on the level of inputs but is never forwarded to an
operator itself.

To fix this we would have to also make operators aware of idleness such
that they can take this into account when computing the combined output
watermark.

Best,
Aljoscha

On 26.08.20 10:02, Dawid Wysakowicz wrote:

> Hi Kien,
>
> I am afraid this is a valid bug. I am not 100% sure but the way I
> understand the code the idleness mechanism applies to input channels,
> which means e.g. when multiple parallell instances shuffle its results
> to downstream operators.
>
> In case of a two input operator, combining the watermark of two
> different upstream operators happens inside of the operator itself.
> There we do not have the idleness status. We do not have a status that a
> whole upstream operator became idle. That's definitely a bug/limitation.
>
> I'm also cc'ing Aljoscha who could maybe confirm my analysis.
>
> Best,
>
> Dawid
>
> On 24/08/2020 16:00, Truong Duc Kien wrote:
>> Hi all,
>> We are testing the new Idleness detection feature in Flink 1.11,
>> however, it does not work as we expected:
>> When we connect two data streams, of which one is idle, the output
>> watermark CoProcessOperator does not increase, hence the program
>> cannot progress.
>>
>> I've made a small project to illustrate the problem. The watermark
>> received by the sink does not increase at all until the idle source is
>> stopped.
>>
>> https://github.com/kien-truong/flink-idleness-testing
>>
>> Is this a bug or does the idleness detection not support this use case ?
>>
>> Regards.
>> Kien
>

Reply | Threaded
Open this post in threaded view
|

Re: Idle stream does not advance watermark in connected stream

Arvid Heise-3
Hi Aljoscha,

I don't quite follow your analysis. If both sources are configured with idleness, they should send a periodic watermark on timeout.
So the code that you posted would receive watermarks on the idle source and thus advance watermarks periodically.

If an idle source does not emit a watermark at all, then I'm curious why it's not mapped to StreamStatus.IDLE [1], which would trigger the desired behavior.


On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek <[hidden email]> wrote:
Yes, I'm afraid this analysis is correct. The StreamOperator,
AbstractStreamOperator to be specific, computes the combined watermarks
from both inputs here:
https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
The operator layer is not aware of idleness so it will never notice. The
idleness only works on the level of inputs but is never forwarded to an
operator itself.

To fix this we would have to also make operators aware of idleness such
that they can take this into account when computing the combined output
watermark.

Best,
Aljoscha

On 26.08.20 10:02, Dawid Wysakowicz wrote:
> Hi Kien,
>
> I am afraid this is a valid bug. I am not 100% sure but the way I
> understand the code the idleness mechanism applies to input channels,
> which means e.g. when multiple parallell instances shuffle its results
> to downstream operators.
>
> In case of a two input operator, combining the watermark of two
> different upstream operators happens inside of the operator itself.
> There we do not have the idleness status. We do not have a status that a
> whole upstream operator became idle. That's definitely a bug/limitation.
>
> I'm also cc'ing Aljoscha who could maybe confirm my analysis.
>
> Best,
>
> Dawid
>
> On 24/08/2020 16:00, Truong Duc Kien wrote:
>> Hi all,
>> We are testing the new Idleness detection feature in Flink 1.11,
>> however, it does not work as we expected:
>> When we connect two data streams, of which one is idle, the output
>> watermark CoProcessOperator does not increase, hence the program
>> cannot progress.
>>
>> I've made a small project to illustrate the problem. The watermark
>> received by the sink does not increase at all until the idle source is
>> stopped.
>>
>> https://github.com/kien-truong/flink-idleness-testing
>>
>> Is this a bug or does the idleness detection not support this use case ?
>>
>> Regards.
>> Kien
>



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Idle stream does not advance watermark in connected stream

Dawid Wysakowicz-2

Hey Arvid,

The problem is that the StreamStatus.IDLE is set on the Task level. It is not propagated to the operator. Combining of the Watermark for a TwoInputStreamOperator happens in the AbstractStreamOperator:

    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }

    public void processWatermark1(Watermark mark) throws Exception {
        input1Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        input2Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

There we do not know that e.g. the whole input 1 is idle. Therefore if we do not receive any Watermarks from it (it became IDLE) we do not progress the Watermark starting from any two input operator. We are missing similar handling of the IDLE status from the task level which works well for one input operators and multiple parallel upstream instances.

Best,

Dawid

On 31/08/2020 11:05, Arvid Heise wrote:
Hi Aljoscha,

I don't quite follow your analysis. If both sources are configured with idleness, they should send a periodic watermark on timeout.
So the code that you posted would receive watermarks on the idle source and thus advance watermarks periodically.

If an idle source does not emit a watermark at all, then I'm curious why it's not mapped to StreamStatus.IDLE [1], which would trigger the desired behavior.


On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek <[hidden email]> wrote:
Yes, I'm afraid this analysis is correct. The StreamOperator,
AbstractStreamOperator to be specific, computes the combined watermarks
from both inputs here:
https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
The operator layer is not aware of idleness so it will never notice. The
idleness only works on the level of inputs but is never forwarded to an
operator itself.

To fix this we would have to also make operators aware of idleness such
that they can take this into account when computing the combined output
watermark.

Best,
Aljoscha

On 26.08.20 10:02, Dawid Wysakowicz wrote:
> Hi Kien,
>
> I am afraid this is a valid bug. I am not 100% sure but the way I
> understand the code the idleness mechanism applies to input channels,
> which means e.g. when multiple parallell instances shuffle its results
> to downstream operators.
>
> In case of a two input operator, combining the watermark of two
> different upstream operators happens inside of the operator itself.
> There we do not have the idleness status. We do not have a status that a
> whole upstream operator became idle. That's definitely a bug/limitation.
>
> I'm also cc'ing Aljoscha who could maybe confirm my analysis.
>
> Best,
>
> Dawid
>
> On 24/08/2020 16:00, Truong Duc Kien wrote:
>> Hi all,
>> We are testing the new Idleness detection feature in Flink 1.11,
>> however, it does not work as we expected:
>> When we connect two data streams, of which one is idle, the output
>> watermark CoProcessOperator does not increase, hence the program
>> cannot progress.
>>
>> I've made a small project to illustrate the problem. The watermark
>> received by the sink does not increase at all until the idle source is
>> stopped.
>>
>> https://github.com/kien-truong/flink-idleness-testing
>>
>> Is this a bug or does the idleness detection not support this use case ?
>>
>> Regards.
>> Kien
>



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Idle stream does not advance watermark in connected stream

Aljoscha Krettek
I can only agree with Dawid, who explained it better than me... 😅

Aljoscha

On 31.08.20 12:10, Dawid Wysakowicz wrote:

> Hey Arvid,
>
> The problem is that the StreamStatus.IDLE is set on the Task level. It
> is not propagated to the operator. Combining of the Watermark for a
> TwoInputStreamOperator happens in the AbstractStreamOperator:
>
>      public void processWatermark(Watermark mark) throws Exception {
>          if (timeServiceManager != null) {
>              timeServiceManager.advanceWatermark(mark);
>          }
>          output.emitWatermark(mark);
>      }
>
>      public void processWatermark1(Watermark mark) throws Exception {
>          input1Watermark = mark.getTimestamp();
>          long newMin = Math.min(input1Watermark, input2Watermark);
>          if (newMin > combinedWatermark) {
>              combinedWatermark = newMin;
>              processWatermark(new Watermark(combinedWatermark));
>          }
>      }
>
>      public void processWatermark2(Watermark mark) throws Exception {
>          input2Watermark = mark.getTimestamp();
>          long newMin = Math.min(input1Watermark, input2Watermark);
>          if (newMin > combinedWatermark) {
>              combinedWatermark = newMin;
>              processWatermark(new Watermark(combinedWatermark));
>          }
>      }
>
> There we do not know that e.g. the whole input 1 is idle. Therefore if
> we do not receive any Watermarks from it (it became IDLE) we do not
> progress the Watermark starting from any two input operator. We are
> missing similar handling of the IDLE status from the task level which
> works well for one input operators and multiple parallel upstream instances.
>
> Best,
>
> Dawid
>
> On 31/08/2020 11:05, Arvid Heise wrote:
>> Hi Aljoscha,
>>
>> I don't quite follow your analysis. If both sources are configured
>> with idleness, they should send a periodic watermark on timeout.
>> So the code that you posted would receive watermarks on the idle
>> source and thus advance watermarks periodically.
>>
>> If an idle source does not emit a watermark at all, then I'm curious
>> why it's not mapped to StreamStatus.IDLE [1], which would trigger the
>> desired behavior.
>>
>> [1]
>> https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd7916698eeee98b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79
>>
>> On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>      Yes, I'm afraid this analysis is correct. The StreamOperator,
>>      AbstractStreamOperator to be specific, computes the combined
>>      watermarks
>>      from both inputs here:
>>      https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
>>
>>      The operator layer is not aware of idleness so it will never
>>      notice. The
>>      idleness only works on the level of inputs but is never forwarded
>>      to an
>>      operator itself.
>>
>>      To fix this we would have to also make operators aware of idleness
>>      such
>>      that they can take this into account when computing the combined
>>      output
>>      watermark.
>>
>>      Best,
>>      Aljoscha
>>
>>      On 26.08.20 10:02, Dawid Wysakowicz wrote:
>>      > Hi Kien,
>>      >
>>      > I am afraid this is a valid bug. I am not 100% sure but the way I
>>      > understand the code the idleness mechanism applies to input
>>      channels,
>>      > which means e.g. when multiple parallell instances shuffle its
>>      results
>>      > to downstream operators.
>>      >
>>      > In case of a two input operator, combining the watermark of two
>>      > different upstream operators happens inside of the operator itself.
>>      > There we do not have the idleness status. We do not have a
>>      status that a
>>      > whole upstream operator became idle. That's definitely a
>>      bug/limitation.
>>      >
>>      > I'm also cc'ing Aljoscha who could maybe confirm my analysis.
>>      >
>>      > Best,
>>      >
>>      > Dawid
>>      >
>>      > On 24/08/2020 16:00, Truong Duc Kien wrote:
>>      >> Hi all,
>>      >> We are testing the new Idleness detection feature in Flink 1.11,
>>      >> however, it does not work as we expected:
>>      >> When we connect two data streams, of which one is idle, the output
>>      >> watermark CoProcessOperator does not increase, hence the program
>>      >> cannot progress.
>>      >>
>>      >> I've made a small project to illustrate the problem. The watermark
>>      >> received by the sink does not increase at all until the idle
>>      source is
>>      >> stopped.
>>      >>
>>      >> https://github.com/kien-truong/flink-idleness-testing
>>      >>
>>      >> Is this a bug or does the idleness detection not support this
>>      use case ?
>>      >>
>>      >> Regards.
>>      >> Kien
>>      >
>>
>>
>>
>> --
>>
>> Arvid Heise| Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/>- The Apache
>> FlinkConference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>>
>> Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
>> BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>> Ji (Toni) Cheng
>

Reply | Threaded
Open this post in threaded view
|

Re: Idle stream does not advance watermark in connected stream

Pierre Bedoucha
Hi and thank you for this thread,

I'm also experimenting the same valid bug/limitation when connecting
streams.

I had a quick look in the annoucments but couldn't find any more
information: Would it be planned to propagate the Idle stream status to the
operator in the upcoming Flink minor versions/releases?

If not, would there be a way around it other than emitting beats in the idle
stream?

Best,
Pierre


Aljoscha Krettek wrote

> I can only agree with Dawid, who explained it better than me... 😅
>
> Aljoscha
>
> On 31.08.20 12:10, Dawid Wysakowicz wrote:
>> Hey Arvid,
>>
>> The problem is that the StreamStatus.IDLE is set on the Task level. It
>> is not propagated to the operator. Combining of the Watermark for a
>> TwoInputStreamOperator happens in the AbstractStreamOperator:
>>
>>      public void processWatermark(Watermark mark) throws Exception {
>>          if (timeServiceManager != null) {
>>              timeServiceManager.advanceWatermark(mark);
>>          }
>>          output.emitWatermark(mark);
>>      }
>>
>>      public void processWatermark1(Watermark mark) throws Exception {
>>          input1Watermark = mark.getTimestamp();
>>          long newMin = Math.min(input1Watermark, input2Watermark);
>>          if (newMin > combinedWatermark) {
>>              combinedWatermark = newMin;
>>              processWatermark(new Watermark(combinedWatermark));
>>          }
>>      }
>>
>>      public void processWatermark2(Watermark mark) throws Exception {
>>          input2Watermark = mark.getTimestamp();
>>          long newMin = Math.min(input1Watermark, input2Watermark);
>>          if (newMin > combinedWatermark) {
>>              combinedWatermark = newMin;
>>              processWatermark(new Watermark(combinedWatermark));
>>          }
>>      }
>>
>> There we do not know that e.g. the whole input 1 is idle. Therefore if
>> we do not receive any Watermarks from it (it became IDLE) we do not
>> progress the Watermark starting from any two input operator. We are
>> missing similar handling of the IDLE status from the task level which
>> works well for one input operators and multiple parallel upstream
>> instances.
>>
>> Best,
>>
>> Dawid
>>
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Idle stream does not advance watermark in connected stream

rmetzger0
 Hi Pierre,

It seems that the community is working on providing a fix with the next 1.11 bugfix release (and for 1.12). You can follow the status of the ticket here: https://issues.apache.org/jira/browse/FLINK-18934

Best,
Robert 

On Thu, Sep 10, 2020 at 11:00 AM Pierre Bedoucha <[hidden email]> wrote:
Hi and thank you for this thread,

I'm also experimenting the same valid bug/limitation when connecting
streams.

I had a quick look in the annoucments but couldn't find any more
information: Would it be planned to propagate the Idle stream status to the
operator in the upcoming Flink minor versions/releases?

If not, would there be a way around it other than emitting beats in the idle
stream?

Best,
Pierre


Aljoscha Krettek wrote
> I can only agree with Dawid, who explained it better than me... 😅
>
> Aljoscha
>
> On 31.08.20 12:10, Dawid Wysakowicz wrote:
>> Hey Arvid,
>>
>> The problem is that the StreamStatus.IDLE is set on the Task level. It
>> is not propagated to the operator. Combining of the Watermark for a
>> TwoInputStreamOperator happens in the AbstractStreamOperator:
>>
>>      public void processWatermark(Watermark mark) throws Exception {
>>          if (timeServiceManager != null) {
>>              timeServiceManager.advanceWatermark(mark);
>>          }
>>          output.emitWatermark(mark);
>>      }
>>
>>      public void processWatermark1(Watermark mark) throws Exception {
>>          input1Watermark = mark.getTimestamp();
>>          long newMin = Math.min(input1Watermark, input2Watermark);
>>          if (newMin > combinedWatermark) {
>>              combinedWatermark = newMin;
>>              processWatermark(new Watermark(combinedWatermark));
>>          }
>>      }
>>
>>      public void processWatermark2(Watermark mark) throws Exception {
>>          input2Watermark = mark.getTimestamp();
>>          long newMin = Math.min(input1Watermark, input2Watermark);
>>          if (newMin > combinedWatermark) {
>>              combinedWatermark = newMin;
>>              processWatermark(new Watermark(combinedWatermark));
>>          }
>>      }
>>
>> There we do not know that e.g. the whole input 1 is idle. Therefore if
>> we do not receive any Watermarks from it (it became IDLE) we do not
>> progress the Watermark starting from any two input operator. We are
>> missing similar handling of the IDLE status from the task level which
>> works well for one input operators and multiple parallel upstream
>> instances.
>>
>> Best,
>>
>> Dawid
>>
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/