Hi all, We are testing the new Idleness detection feature in Flink 1.11, however, it does not work as we expected: When we connect two data streams, of which one is idle, the output watermark
CoProcessOperator does not increase, hence the program cannot progress. I've made a small project to illustrate the problem. The watermark received by the sink does not increase at all until the idle source is stopped. Is this a bug or does the idleness detection not support this use case ? Regards. Kien |
Hi Kien,
I am afraid this is a valid bug. I am not 100% sure but the way I understand the code the idleness mechanism applies to input channels, which means e.g. when multiple parallell instances shuffle its results to downstream operators. In case of a two input operator, combining the watermark of two different upstream operators happens inside of the operator itself. There we do not have the idleness status. We do not have a status that a whole upstream operator became idle. That's definitely a bug/limitation. I'm also cc'ing Aljoscha who could maybe confirm my analysis. Best, Dawid On 24/08/2020 16:00, Truong Duc Kien wrote: > Hi all, > We are testing the new Idleness detection feature in Flink 1.11, > however, it does not work as we expected: > When we connect two data streams, of which one is idle, the output > watermark CoProcessOperator does not increase, hence the program > cannot progress. > > I've made a small project to illustrate the problem. The watermark > received by the sink does not increase at all until the idle source is > stopped. > > https://github.com/kien-truong/flink-idleness-testing > > Is this a bug or does the idleness detection not support this use case ? > > Regards. > Kien signature.asc (849 bytes) Download Attachment |
Yes, I'm afraid this analysis is correct. The StreamOperator,
AbstractStreamOperator to be specific, computes the combined watermarks from both inputs here: https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573. The operator layer is not aware of idleness so it will never notice. The idleness only works on the level of inputs but is never forwarded to an operator itself. To fix this we would have to also make operators aware of idleness such that they can take this into account when computing the combined output watermark. Best, Aljoscha On 26.08.20 10:02, Dawid Wysakowicz wrote: > Hi Kien, > > I am afraid this is a valid bug. I am not 100% sure but the way I > understand the code the idleness mechanism applies to input channels, > which means e.g. when multiple parallell instances shuffle its results > to downstream operators. > > In case of a two input operator, combining the watermark of two > different upstream operators happens inside of the operator itself. > There we do not have the idleness status. We do not have a status that a > whole upstream operator became idle. That's definitely a bug/limitation. > > I'm also cc'ing Aljoscha who could maybe confirm my analysis. > > Best, > > Dawid > > On 24/08/2020 16:00, Truong Duc Kien wrote: >> Hi all, >> We are testing the new Idleness detection feature in Flink 1.11, >> however, it does not work as we expected: >> When we connect two data streams, of which one is idle, the output >> watermark CoProcessOperator does not increase, hence the program >> cannot progress. >> >> I've made a small project to illustrate the problem. The watermark >> received by the sink does not increase at all until the idle source is >> stopped. >> >> https://github.com/kien-truong/flink-idleness-testing >> >> Is this a bug or does the idleness detection not support this use case ? >> >> Regards. >> Kien > |
Hi Aljoscha, I don't quite follow your analysis. If both sources are configured with idleness, they should send a periodic watermark on timeout. So the code that you posted would receive watermarks on the idle source and thus advance watermarks periodically. If an idle source does not emit a watermark at all, then I'm curious why it's not mapped to StreamStatus.IDLE [1], which would trigger the desired behavior. On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek <[hidden email]> wrote: Yes, I'm afraid this analysis is correct. The StreamOperator, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hey Arvid, The problem is that the StreamStatus.IDLE is set on the Task level. It is not propagated to the operator. Combining of the Watermark for a TwoInputStreamOperator happens in the AbstractStreamOperator: public void processWatermark(Watermark mark) throws Exception
{ There we do not know that e.g. the whole input 1 is idle. Therefore if we do not receive any Watermarks from it (it became IDLE) we do not progress the Watermark starting from any two input operator. We are missing similar handling of the IDLE status from the task level which works well for one input operators and multiple parallel upstream instances. Best, Dawid On 31/08/2020 11:05, Arvid Heise wrote:
signature.asc (849 bytes) Download Attachment |
I can only agree with Dawid, who explained it better than me... 😅
Aljoscha On 31.08.20 12:10, Dawid Wysakowicz wrote: > Hey Arvid, > > The problem is that the StreamStatus.IDLE is set on the Task level. It > is not propagated to the operator. Combining of the Watermark for a > TwoInputStreamOperator happens in the AbstractStreamOperator: > > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > > public void processWatermark1(Watermark mark) throws Exception { > input1Watermark = mark.getTimestamp(); > long newMin = Math.min(input1Watermark, input2Watermark); > if (newMin > combinedWatermark) { > combinedWatermark = newMin; > processWatermark(new Watermark(combinedWatermark)); > } > } > > public void processWatermark2(Watermark mark) throws Exception { > input2Watermark = mark.getTimestamp(); > long newMin = Math.min(input1Watermark, input2Watermark); > if (newMin > combinedWatermark) { > combinedWatermark = newMin; > processWatermark(new Watermark(combinedWatermark)); > } > } > > There we do not know that e.g. the whole input 1 is idle. Therefore if > we do not receive any Watermarks from it (it became IDLE) we do not > progress the Watermark starting from any two input operator. We are > missing similar handling of the IDLE status from the task level which > works well for one input operators and multiple parallel upstream instances. > > Best, > > Dawid > > On 31/08/2020 11:05, Arvid Heise wrote: >> Hi Aljoscha, >> >> I don't quite follow your analysis. If both sources are configured >> with idleness, they should send a periodic watermark on timeout. >> So the code that you posted would receive watermarks on the idle >> source and thus advance watermarks periodically. >> >> If an idle source does not emit a watermark at all, then I'm curious >> why it's not mapped to StreamStatus.IDLE [1], which would trigger the >> desired behavior. >> >> [1] >> https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd7916698eeee98b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79 >> >> On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> Yes, I'm afraid this analysis is correct. The StreamOperator, >> AbstractStreamOperator to be specific, computes the combined >> watermarks >> from both inputs here: >> https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573. >> >> The operator layer is not aware of idleness so it will never >> notice. The >> idleness only works on the level of inputs but is never forwarded >> to an >> operator itself. >> >> To fix this we would have to also make operators aware of idleness >> such >> that they can take this into account when computing the combined >> output >> watermark. >> >> Best, >> Aljoscha >> >> On 26.08.20 10:02, Dawid Wysakowicz wrote: >> > Hi Kien, >> > >> > I am afraid this is a valid bug. I am not 100% sure but the way I >> > understand the code the idleness mechanism applies to input >> channels, >> > which means e.g. when multiple parallell instances shuffle its >> results >> > to downstream operators. >> > >> > In case of a two input operator, combining the watermark of two >> > different upstream operators happens inside of the operator itself. >> > There we do not have the idleness status. We do not have a >> status that a >> > whole upstream operator became idle. That's definitely a >> bug/limitation. >> > >> > I'm also cc'ing Aljoscha who could maybe confirm my analysis. >> > >> > Best, >> > >> > Dawid >> > >> > On 24/08/2020 16:00, Truong Duc Kien wrote: >> >> Hi all, >> >> We are testing the new Idleness detection feature in Flink 1.11, >> >> however, it does not work as we expected: >> >> When we connect two data streams, of which one is idle, the output >> >> watermark CoProcessOperator does not increase, hence the program >> >> cannot progress. >> >> >> >> I've made a small project to illustrate the problem. The watermark >> >> received by the sink does not increase at all until the idle >> source is >> >> stopped. >> >> >> >> https://github.com/kien-truong/flink-idleness-testing >> >> >> >> Is this a bug or does the idleness detection not support this >> use case ? >> >> >> >> Regards. >> >> Kien >> > >> >> >> >> -- >> >> Arvid Heise| Senior Java Developer >> >> <https://www.ververica.com/> >> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/>- The Apache >> FlinkConference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> >> Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 >> BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >> Ji (Toni) Cheng > |
Hi and thank you for this thread,
I'm also experimenting the same valid bug/limitation when connecting streams. I had a quick look in the annoucments but couldn't find any more information: Would it be planned to propagate the Idle stream status to the operator in the upcoming Flink minor versions/releases? If not, would there be a way around it other than emitting beats in the idle stream? Best, Pierre Aljoscha Krettek wrote > I can only agree with Dawid, who explained it better than me... 😅 > > Aljoscha > > On 31.08.20 12:10, Dawid Wysakowicz wrote: >> Hey Arvid, >> >> The problem is that the StreamStatus.IDLE is set on the Task level. It >> is not propagated to the operator. Combining of the Watermark for a >> TwoInputStreamOperator happens in the AbstractStreamOperator: >> >> public void processWatermark(Watermark mark) throws Exception { >> if (timeServiceManager != null) { >> timeServiceManager.advanceWatermark(mark); >> } >> output.emitWatermark(mark); >> } >> >> public void processWatermark1(Watermark mark) throws Exception { >> input1Watermark = mark.getTimestamp(); >> long newMin = Math.min(input1Watermark, input2Watermark); >> if (newMin > combinedWatermark) { >> combinedWatermark = newMin; >> processWatermark(new Watermark(combinedWatermark)); >> } >> } >> >> public void processWatermark2(Watermark mark) throws Exception { >> input2Watermark = mark.getTimestamp(); >> long newMin = Math.min(input1Watermark, input2Watermark); >> if (newMin > combinedWatermark) { >> combinedWatermark = newMin; >> processWatermark(new Watermark(combinedWatermark)); >> } >> } >> >> There we do not know that e.g. the whole input 1 is idle. Therefore if >> we do not receive any Watermarks from it (it became IDLE) we do not >> progress the Watermark starting from any two input operator. We are >> missing similar handling of the IDLE status from the task level which >> works well for one input operators and multiple parallel upstream >> instances. >> >> Best, >> >> Dawid >> >> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Pierre, It seems that the community is working on providing a fix with the next 1.11 bugfix release (and for 1.12). You can follow the status of the ticket here: https://issues.apache.org/jira/browse/FLINK-18934 Best, Robert On Thu, Sep 10, 2020 at 11:00 AM Pierre Bedoucha <[hidden email]> wrote: Hi and thank you for this thread, |
Free forum by Nabble | Edit this page |