`assignTimestampsAndWatermarks` before `keyBy` works:
```java DataStream<Trip> trips = env.addSource(consumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { @Override public long extractTimestamp(Trip trip) { return trip.endTime.getTime(); } }); KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new Featurization()); AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = featurizedUserTrips.timeWindowAll(Time.days(7), Time.days(1)); ``` But not after `keyBy` and `process`: ```java DataStream<Trip> trips = env.addSource(consumer); KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new Featurization()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { @Override public long extractTimestamp(FeaturizedTrip trip) { return trip.endTime.getTime(); } }); AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = featurizedUserTrips.timeWindowAll(Time.days(7), Time.days(1)); ``` Windows are never triggered. Is it a bug or expected behavior? If the latter, where is it documented? |
Hi,
Could you check the watermark of the window operator? One possible situation would be some of the keys are not getting enough inputs, so their watermarks remain below the window end time and hold the window operator watermark back. IMO, it’s a good practice to assign watermark earlier in the data pipeline. Best, Paul Lam
|
I don't think it is the watermark. I see the same watermarks from the two versions of code.
The processing on the keyed stream doesn't change event time at all. I can simply change my code to use `map` on the keyed stream to return back the input data, so that the window operator receives the exactly same data. The only difference is when I do `assignTimestampsAndWatermarks`. The result is the same, `assignTimestampsAndWatermarks` before `keyBy` works: ```java DataStream<Trip> trips = env.addSource(consumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { @Override public long extractTimestamp(Trip trip) { return trip.endTime.getTime(); } }); KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); AllWindowedStream<Trip, TimeWindow> windowedUserTrips = featurizedUserTrips.timeWindowAll(Time.days(7), Time.days(1)); ``` `assignTimestampsAndWatermarks` after `keyBy` doesn't work: ```java DataStream<Trip> trips = env.addSource(consumer); KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { @Override public long extractTimestamp(Trip trip) { return trip.endTime.getTime(); } }); AllWindowedStream<Trip, TimeWindow> windowedUserTrips = featurizedUserTrips.timeWindowAll(Time.days(7), Time.days(1)); ``` It feels a bug to me, but I want to confirm it before I file the bug report. On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: > Hi, > > Could you check the watermark of the window operator? One possible situation would be some of the keys are not getting enough inputs, so their watermarks remain below the window end time and hold the window operator watermark back. IMO, it’s a good practice to assign watermark earlier in the data pipeline. > > Best, > Paul Lam > > > 在 2019年4月17日,23:04,[hidden email] 写道: > > > > `assignTimestampsAndWatermarks` before `keyBy` works: > > ```java > > DataStream<Trip> trips = > > env.addSource(consumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > @Override > > public long extractTimestamp(Trip trip) { > > return trip.endTime.getTime(); > > } > > }); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new Featurization()); > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > > > But not after `keyBy` and `process`: > > ```java > > DataStream<Trip> trips = env.addSource(consumer); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > DataStream<FeaturizedTrip> featurizedUserTrips = > > userTrips.process(new Featurization()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > @Override > > public long extractTimestamp(FeaturizedTrip trip) { > > return trip.endTime.getTime(); > > } > > }); > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > Windows are never triggered. > > > > Is it a bug or expected behavior? If the latter, where is it documented? > > > > |
Hi, After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors could receive the elements(trip). If that is the case BoundedOutOfOrdernessTimestampExtractor, which does not receive element would not send the WM. Since that the timeWindowAll operator could not be triggered. You could add a shuffle() before the assignTimestampsAndWatermarks in your second case and check if the window is triggered. If it could be triggered you could check the distribution of elements generated by the source. Best, Guowei I don't think it is the watermark. I see the same watermarks from the two versions of code. |
Hi,
First of all, thank you for the `shuffle()` tip. It works. However, I still don't understand why it doesn't work without calling `shuffle()`. Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? All the trips has keys and timestamps. As I said in my reply to Paul, I see the same watermarks being extracted. How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` matter? My understanding is any specific window for a specific key always receives the exactly same data, and the calling order of `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. To make `keyBy` as irrelevant as possible, I tried letting it always return the same key so that there is only 1 keyed stream and it is exactly the same as the original unkeyed stream. It still doesn't trigger windows: ```java DataStream<Trip> trips = env.addSource(consumer); KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { @Override public long extractTimestamp(Trip trip) { return trip.endTime.getTime(); } }); AllWindowedStream<Trip, TimeWindow> windowedUserTrips = featurizedUserTrips.timeWindowAll(Time.days(7), Time.days(1)); ``` It makes no sense to me. Please help me understand why it doesn't work. Thanks! On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> wrote: > Hi, > After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > could receive the elements(trip). If that is the case > BoundedOutOfOrdernessTimestampExtractor, which does not receive element > would not send the WM. Since that the timeWindowAll operator could not be > triggered. > You could add a shuffle() before the assignTimestampsAndWatermarks in your > second case and check if the window is triggered. If it could be triggered > you could check the distribution of elements generated by the source. > > Best, > Guowei > > > [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: > > > I don't think it is the watermark. I see the same watermarks from the two > > versions of code. > > > > The processing on the keyed stream doesn't change event time at all. I can > > simply change my code to use `map` on the keyed stream to return back the > > input data, so that the window operator receives the exactly same data. The > > only difference is when I do `assignTimestampsAndWatermarks`. The result is > > the same, `assignTimestampsAndWatermarks` before `keyBy` works: > > ```java > > DataStream<Trip> trips = > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > @Override > > public long extractTimestamp(Trip trip) { > > return trip.endTime.getTime(); > > } > > }); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > > > `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > ```java > > DataStream<Trip> trips = env.addSource(consumer); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > DataStream<Trip> featurizedUserTrips = > > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > @Override > > public long extractTimestamp(Trip trip) { > > return trip.endTime.getTime(); > > } > > }); > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > > > It feels a bug to me, but I want to confirm it before I file the bug > > report. > > > > On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: > > > Hi, > > > > > > Could you check the watermark of the window operator? One possible > > situation would be some of the keys are not getting enough inputs, so their > > watermarks remain below the window end time and hold the window operator > > watermark back. IMO, it’s a good practice to assign watermark earlier in > > the data pipeline. > > > > > > Best, > > > Paul Lam > > > > > > > 在 2019年4月17日,23:04,[hidden email] 写道: > > > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works: > > > > ```java > > > > DataStream<Trip> trips = > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > @Override > > > > public long extractTimestamp(Trip trip) { > > > > return trip.endTime.getTime(); > > > > } > > > > }); > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > > > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new > > Featurization()); > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > Time.days(1)); > > > > ``` > > > > > > > > But not after `keyBy` and `process`: > > > > ```java > > > > DataStream<Trip> trips = env.addSource(consumer); > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > > > DataStream<FeaturizedTrip> featurizedUserTrips = > > > > userTrips.process(new > > Featurization()).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > @Override > > > > public long extractTimestamp(FeaturizedTrip trip) { > > > > return trip.endTime.getTime(); > > > > } > > > > }); > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > Time.days(1)); > > > > ``` > > > > Windows are never triggered. > > > > > > > > Is it a bug or expected behavior? If the latter, where is it > > documented? > > > > > > > > > > > > > |
HI, BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it receives an element. For after Keyby: Flink uses the HashCode of key and the parallelism of down stream to decide which subtask would receive the element. This means if your key is always same, all the sources will only send the elements to the same down stream task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. For before Keyby: In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would be chained together, which means every BoundedOutOfOrdernessTimestampExtractors will receive elements. Best, Guowei an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: Hi, |
Thanks, I feel I'm getting closer to the truth.
So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 tasks running after `keyBy` if even all elements have the same key so go to 1 down stream(say task 1)? And it is the other task(task 2) with no incoming data that caused the `timeWindowAll` stream unable to progress? Because both task 1 and task 2 are its input streams and one is idling so its event time cannot make progress? On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> wrote: > HI, > > BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it > receives an element. > > For after Keyby: > Flink uses the HashCode of key and the parallelism of down stream to decide > which subtask would receive the element. This means if your key is always > same, all the sources will only send the elements to the same down stream > task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. > > For before Keyby: > In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would > be chained together, which means every > BoundedOutOfOrdernessTimestampExtractors will receive elements. > > Best, > Guowei > > > an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > > > Hi, > > > > First of all, thank you for the `shuffle()` tip. It works. However, I > > still don't understand why it doesn't work without calling `shuffle()`. > > > > Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? > > All the trips has keys and timestamps. As I said in my reply to Paul, I see > > the same watermarks being extracted. > > > > How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` > > matter? My understanding is any specific window for a specific key always > > receives the exactly same data, and the calling order of > > `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > > > > To make `keyBy` as irrelevant as possible, I tried letting it always > > return the same key so that there is only 1 keyed stream and it is exactly > > the same as the original unkeyed stream. It still doesn't trigger windows: > > ```java > > DataStream<Trip> trips = env.addSource(consumer); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > > DataStream<Trip> featurizedUserTrips = > > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > @Override > > public long extractTimestamp(Trip trip) { > > return trip.endTime.getTime(); > > } > > }); > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > > > It makes no sense to me. Please help me understand why it doesn't work. > > Thanks! > > > > On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> wrote: > > > Hi, > > > After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > > > could receive the elements(trip). If that is the case > > > BoundedOutOfOrdernessTimestampExtractor, which does not receive element > > > would not send the WM. Since that the timeWindowAll operator could not be > > > triggered. > > > You could add a shuffle() before the assignTimestampsAndWatermarks in > > your > > > second case and check if the window is triggered. If it could be > > triggered > > > you could check the distribution of elements generated by the source. > > > > > > Best, > > > Guowei > > > > > > > > > [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: > > > > > > > I don't think it is the watermark. I see the same watermarks from the > > two > > > > versions of code. > > > > > > > > The processing on the keyed stream doesn't change event time at all. I > > can > > > > simply change my code to use `map` on the keyed stream to return back > > the > > > > input data, so that the window operator receives the exactly same > > data. The > > > > only difference is when I do `assignTimestampsAndWatermarks`. The > > result is > > > > the same, `assignTimestampsAndWatermarks` before `keyBy` works: > > > > ```java > > > > DataStream<Trip> trips = > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > @Override > > > > public long extractTimestamp(Trip trip) { > > > > return trip.endTime.getTime(); > > > > } > > > > }); > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > > > DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > > > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > Time.days(1)); > > > > ``` > > > > > > > > `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > > > ```java > > > > DataStream<Trip> trips = env.addSource(consumer); > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > > > DataStream<Trip> featurizedUserTrips = > > > > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > @Override > > > > public long extractTimestamp(Trip trip) { > > > > return trip.endTime.getTime(); > > > > } > > > > }); > > > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > Time.days(1)); > > > > ``` > > > > > > > > It feels a bug to me, but I want to confirm it before I file the bug > > > > report. > > > > > > > > On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: > > > > > Hi, > > > > > > > > > > Could you check the watermark of the window operator? One possible > > > > situation would be some of the keys are not getting enough inputs, so > > their > > > > watermarks remain below the window end time and hold the window > > operator > > > > watermark back. IMO, it’s a good practice to assign watermark earlier > > in > > > > the data pipeline. > > > > > > > > > > Best, > > > > > Paul Lam > > > > > > > > > > > 在 2019年4月17日,23:04,[hidden email] 写道: > > > > > > > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works: > > > > > > ```java > > > > > > DataStream<Trip> trips = > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > @Override > > > > > > public long extractTimestamp(Trip trip) { > > > > > > return trip.endTime.getTime(); > > > > > > } > > > > > > }); > > > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > trip.userId); > > > > > > DataStream<FeaturizedTrip> featurizedUserTrips = > > userTrips.process(new > > > > Featurization()); > > > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > Time.days(1)); > > > > > > ``` > > > > > > > > > > > > But not after `keyBy` and `process`: > > > > > > ```java > > > > > > DataStream<Trip> trips = env.addSource(consumer); > > > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > trip.userId); > > > > > > DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > > userTrips.process(new > > > > Featurization()).assignTimestampsAndWatermarks(new > > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > > > @Override > > > > > > public long extractTimestamp(FeaturizedTrip trip) { > > > > > > return trip.endTime.getTime(); > > > > > > } > > > > > > }); > > > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > Time.days(1)); > > > > > > ``` > > > > > > Windows are never triggered. > > > > > > > > > > > > Is it a bug or expected behavior? If the latter, where is it > > > > documented? > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi,
Yes I think your explanation is correct. I can also recommend Seth's webinar where he talks about debugging Watermarks[1] Best, Dawid [1] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial On 22/04/2019 22:55, an0 wrote: > Thanks, I feel I'm getting closer to the truth. > > So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 tasks running after `keyBy` if even all elements have the same key so go to 1 down stream(say task 1)? And it is the other task(task 2) with no incoming data that caused the `timeWindowAll` stream unable to progress? Because both task 1 and task 2 are its input streams and one is idling so its event time cannot make progress? > > On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> wrote: >> HI, >> >> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it >> receives an element. >> >> For after Keyby: >> Flink uses the HashCode of key and the parallelism of down stream to decide >> which subtask would receive the element. This means if your key is always >> same, all the sources will only send the elements to the same down stream >> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. >> >> For before Keyby: >> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would >> be chained together, which means every >> BoundedOutOfOrdernessTimestampExtractors will receive elements. >> >> Best, >> Guowei >> >> >> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: >> >>> Hi, >>> >>> First of all, thank you for the `shuffle()` tip. It works. However, I >>> still don't understand why it doesn't work without calling `shuffle()`. >>> >>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? >>> All the trips has keys and timestamps. As I said in my reply to Paul, I see >>> the same watermarks being extracted. >>> >>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` >>> matter? My understanding is any specific window for a specific key always >>> receives the exactly same data, and the calling order of >>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. >>> >>> To make `keyBy` as irrelevant as possible, I tried letting it always >>> return the same key so that there is only 1 keyed stream and it is exactly >>> the same as the original unkeyed stream. It still doesn't trigger windows: >>> ```java >>> DataStream<Trip> trips = env.addSource(consumer); >>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); >>> DataStream<Trip> featurizedUserTrips = >>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>> @Override >>> public long extractTimestamp(Trip trip) { >>> return trip.endTime.getTime(); >>> } >>> }); >>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>> featurizedUserTrips.timeWindowAll(Time.days(7), >>> Time.days(1)); >>> ``` >>> >>> It makes no sense to me. Please help me understand why it doesn't work. >>> Thanks! >>> >>> On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> wrote: >>>> Hi, >>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors >>>> could receive the elements(trip). If that is the case >>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element >>>> would not send the WM. Since that the timeWindowAll operator could not be >>>> triggered. >>>> You could add a shuffle() before the assignTimestampsAndWatermarks in >>> your >>>> second case and check if the window is triggered. If it could be >>> triggered >>>> you could check the distribution of elements generated by the source. >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: >>>> >>>>> I don't think it is the watermark. I see the same watermarks from the >>> two >>>>> versions of code. >>>>> >>>>> The processing on the keyed stream doesn't change event time at all. I >>> can >>>>> simply change my code to use `map` on the keyed stream to return back >>> the >>>>> input data, so that the window operator receives the exactly same >>> data. The >>>>> only difference is when I do `assignTimestampsAndWatermarks`. The >>> result is >>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: >>>>> ```java >>>>> DataStream<Trip> trips = >>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>> @Override >>>>> public long extractTimestamp(Trip trip) { >>>>> return trip.endTime.getTime(); >>>>> } >>>>> }); >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); >>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>> Time.days(1)); >>>>> ``` >>>>> >>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: >>>>> ```java >>>>> DataStream<Trip> trips = env.addSource(consumer); >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); >>>>> DataStream<Trip> featurizedUserTrips = >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>> @Override >>>>> public long extractTimestamp(Trip trip) { >>>>> return trip.endTime.getTime(); >>>>> } >>>>> }); >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>> Time.days(1)); >>>>> ``` >>>>> >>>>> It feels a bug to me, but I want to confirm it before I file the bug >>>>> report. >>>>> >>>>> On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: >>>>>> Hi, >>>>>> >>>>>> Could you check the watermark of the window operator? One possible >>>>> situation would be some of the keys are not getting enough inputs, so >>> their >>>>> watermarks remain below the window end time and hold the window >>> operator >>>>> watermark back. IMO, it’s a good practice to assign watermark earlier >>> in >>>>> the data pipeline. >>>>>> Best, >>>>>> Paul Lam >>>>>> >>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: >>>>>>> >>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: >>>>>>> ```java >>>>>>> DataStream<Trip> trips = >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>>>> @Override >>>>>>> public long extractTimestamp(Trip trip) { >>>>>>> return trip.endTime.getTime(); >>>>>>> } >>>>>>> }); >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> >>> trip.userId); >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = >>> userTrips.process(new >>>>> Featurization()); >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>>>> Time.days(1)); >>>>>>> ``` >>>>>>> >>>>>>> But not after `keyBy` and `process`: >>>>>>> ```java >>>>>>> DataStream<Trip> trips = env.addSource(consumer); >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> >>> trip.userId); >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = >>>>>>> userTrips.process(new >>>>> Featurization()).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { >>>>>>> @Override >>>>>>> public long extractTimestamp(FeaturizedTrip trip) { >>>>>>> return trip.endTime.getTime(); >>>>>>> } >>>>>>> }); >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>>>> Time.days(1)); >>>>>>> ``` >>>>>>> Windows are never triggered. >>>>>>> >>>>>>> Is it a bug or expected behavior? If the latter, where is it >>>>> documented? >>>>>> signature.asc (849 bytes) Download Attachment |
If my understanding is correct, then why `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 and task 2, with task 2 idling, no matter whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether task 2 receives elements only depends on the key distribution, has nothing to do with timestamp assignment, right?
/key 1 trips\ / \ (A) trips--> assignTimestampsAndWatermarks-->keyBy timeWindowAll \ idle / \key 2 trips/ /key 1 trips--> assignTimestampsAndWatermarks\ / \ (B) trips-->keyBy timeWindowAll \ idle / \key 2 trips--> assignTimestampsAndWatermarks/ How things are different between A and B from `timeWindowAll`'s perspective? BTW, thanks for the webinar link, I'll check it later. On 2019/04/25 08:30:20, Dawid Wysakowicz <[hidden email]> wrote: > Hi, > > Yes I think your explanation is correct. I can also recommend Seth's > webinar where he talks about debugging Watermarks[1] > > Best, > > Dawid > > [1] > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > On 22/04/2019 22:55, an0 wrote: > > Thanks, I feel I'm getting closer to the truth. > > > > So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 tasks running after `keyBy` if even all elements have the same key so go to 1 down stream(say task 1)? And it is the other task(task 2) with no incoming data that caused the `timeWindowAll` stream unable to progress? Because both task 1 and task 2 are its input streams and one is idling so its event time cannot make progress? > > > > On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> wrote: > >> HI, > >> > >> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it > >> receives an element. > >> > >> For after Keyby: > >> Flink uses the HashCode of key and the parallelism of down stream to decide > >> which subtask would receive the element. This means if your key is always > >> same, all the sources will only send the elements to the same down stream > >> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. > >> > >> For before Keyby: > >> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would > >> be chained together, which means every > >> BoundedOutOfOrdernessTimestampExtractors will receive elements. > >> > >> Best, > >> Guowei > >> > >> > >> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > >> > >>> Hi, > >>> > >>> First of all, thank you for the `shuffle()` tip. It works. However, I > >>> still don't understand why it doesn't work without calling `shuffle()`. > >>> > >>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? > >>> All the trips has keys and timestamps. As I said in my reply to Paul, I see > >>> the same watermarks being extracted. > >>> > >>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` > >>> matter? My understanding is any specific window for a specific key always > >>> receives the exactly same data, and the calling order of > >>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > >>> > >>> To make `keyBy` as irrelevant as possible, I tried letting it always > >>> return the same key so that there is only 1 keyed stream and it is exactly > >>> the same as the original unkeyed stream. It still doesn't trigger windows: > >>> ```java > >>> DataStream<Trip> trips = env.addSource(consumer); > >>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > >>> DataStream<Trip> featurizedUserTrips = > >>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>> @Override > >>> public long extractTimestamp(Trip trip) { > >>> return trip.endTime.getTime(); > >>> } > >>> }); > >>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>> Time.days(1)); > >>> ``` > >>> > >>> It makes no sense to me. Please help me understand why it doesn't work. > >>> Thanks! > >>> > >>> On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> wrote: > >>>> Hi, > >>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > >>>> could receive the elements(trip). If that is the case > >>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element > >>>> would not send the WM. Since that the timeWindowAll operator could not be > >>>> triggered. > >>>> You could add a shuffle() before the assignTimestampsAndWatermarks in > >>> your > >>>> second case and check if the window is triggered. If it could be > >>> triggered > >>>> you could check the distribution of elements generated by the source. > >>>> > >>>> Best, > >>>> Guowei > >>>> > >>>> > >>>> [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: > >>>> > >>>>> I don't think it is the watermark. I see the same watermarks from the > >>> two > >>>>> versions of code. > >>>>> > >>>>> The processing on the keyed stream doesn't change event time at all. I > >>> can > >>>>> simply change my code to use `map` on the keyed stream to return back > >>> the > >>>>> input data, so that the window operator receives the exactly same > >>> data. The > >>>>> only difference is when I do `assignTimestampsAndWatermarks`. The > >>> result is > >>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>> ```java > >>>>> DataStream<Trip> trips = > >>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>> @Override > >>>>> public long extractTimestamp(Trip trip) { > >>>>> return trip.endTime.getTime(); > >>>>> } > >>>>> }); > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>> Time.days(1)); > >>>>> ``` > >>>>> > >>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > >>>>> ```java > >>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>> DataStream<Trip> featurizedUserTrips = > >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>> @Override > >>>>> public long extractTimestamp(Trip trip) { > >>>>> return trip.endTime.getTime(); > >>>>> } > >>>>> }); > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>> Time.days(1)); > >>>>> ``` > >>>>> > >>>>> It feels a bug to me, but I want to confirm it before I file the bug > >>>>> report. > >>>>> > >>>>> On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: > >>>>>> Hi, > >>>>>> > >>>>>> Could you check the watermark of the window operator? One possible > >>>>> situation would be some of the keys are not getting enough inputs, so > >>> their > >>>>> watermarks remain below the window end time and hold the window > >>> operator > >>>>> watermark back. IMO, it’s a good practice to assign watermark earlier > >>> in > >>>>> the data pipeline. > >>>>>> Best, > >>>>>> Paul Lam > >>>>>> > >>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: > >>>>>>> > >>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = > >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>> trip.userId); > >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>> userTrips.process(new > >>>>> Featurization()); > >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> > >>>>>>> But not after `keyBy` and `process`: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>> trip.userId); > >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>>>>>> userTrips.process(new > >>>>> Featurization()).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(FeaturizedTrip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> Windows are never triggered. > >>>>>>> > >>>>>>> Is it a bug or expected behavior? If the latter, where is it > >>>>> documented? > >>>>>> > > |
Hi,
Watermarks are meta events that travel independently of data events. 1) If you assingTimestampsAndWatermarks before keyBy, all parallel instances of trips have some data(this is my assumption) so Watermarks can be generated. Afterwards even if some of the keyed partitions have no data, Watermarks are broadcasted/forwarded anyway. In other words if at some point Watermarks were generated for all partitions of a single stage, they will be forwarded beyond this point. 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign watermarks for an empty partition which produces no Watermarks at all for this partition, therefore there is no progress beyond this point. I hope this clarifies it a bit. Best, Dawid On 25/04/2019 16:49, an0 wrote: > If my understanding is correct, then why `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 and task 2, with task 2 idling, no matter whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether task 2 receives elements only depends on the key distribution, has nothing to do with timestamp assignment, right? > > /key 1 trips\ > / \ > (A) trips--> assignTimestampsAndWatermarks-->keyBy timeWindowAll > \ idle / > \key 2 trips/ > > /key 1 trips--> assignTimestampsAndWatermarks\ > / \ > (B) trips-->keyBy timeWindowAll > \ idle / > \key 2 trips--> assignTimestampsAndWatermarks/ > > How things are different between A and B from `timeWindowAll`'s perspective? > > BTW, thanks for the webinar link, I'll check it later. > > On 2019/04/25 08:30:20, Dawid Wysakowicz <[hidden email]> wrote: >> Hi, >> >> Yes I think your explanation is correct. I can also recommend Seth's >> webinar where he talks about debugging Watermarks[1] >> >> Best, >> >> Dawid >> >> [1] >> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial >> >> On 22/04/2019 22:55, an0 wrote: >>> Thanks, I feel I'm getting closer to the truth. >>> >>> So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 tasks running after `keyBy` if even all elements have the same key so go to 1 down stream(say task 1)? And it is the other task(task 2) with no incoming data that caused the `timeWindowAll` stream unable to progress? Because both task 1 and task 2 are its input streams and one is idling so its event time cannot make progress? >>> >>> On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> wrote: >>>> HI, >>>> >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it >>>> receives an element. >>>> >>>> For after Keyby: >>>> Flink uses the HashCode of key and the parallelism of down stream to decide >>>> which subtask would receive the element. This means if your key is always >>>> same, all the sources will only send the elements to the same down stream >>>> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. >>>> >>>> For before Keyby: >>>> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would >>>> be chained together, which means every >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: >>>> >>>>> Hi, >>>>> >>>>> First of all, thank you for the `shuffle()` tip. It works. However, I >>>>> still don't understand why it doesn't work without calling `shuffle()`. >>>>> >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? >>>>> All the trips has keys and timestamps. As I said in my reply to Paul, I see >>>>> the same watermarks being extracted. >>>>> >>>>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` >>>>> matter? My understanding is any specific window for a specific key always >>>>> receives the exactly same data, and the calling order of >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. >>>>> >>>>> To make `keyBy` as irrelevant as possible, I tried letting it always >>>>> return the same key so that there is only 1 keyed stream and it is exactly >>>>> the same as the original unkeyed stream. It still doesn't trigger windows: >>>>> ```java >>>>> DataStream<Trip> trips = env.addSource(consumer); >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); >>>>> DataStream<Trip> featurizedUserTrips = >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>> @Override >>>>> public long extractTimestamp(Trip trip) { >>>>> return trip.endTime.getTime(); >>>>> } >>>>> }); >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>> Time.days(1)); >>>>> ``` >>>>> >>>>> It makes no sense to me. Please help me understand why it doesn't work. >>>>> Thanks! >>>>> >>>>> On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> wrote: >>>>>> Hi, >>>>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors >>>>>> could receive the elements(trip). If that is the case >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element >>>>>> would not send the WM. Since that the timeWindowAll operator could not be >>>>>> triggered. >>>>>> You could add a shuffle() before the assignTimestampsAndWatermarks in >>>>> your >>>>>> second case and check if the window is triggered. If it could be >>>>> triggered >>>>>> you could check the distribution of elements generated by the source. >>>>>> >>>>>> Best, >>>>>> Guowei >>>>>> >>>>>> >>>>>> [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: >>>>>> >>>>>>> I don't think it is the watermark. I see the same watermarks from the >>>>> two >>>>>>> versions of code. >>>>>>> >>>>>>> The processing on the keyed stream doesn't change event time at all. I >>>>> can >>>>>>> simply change my code to use `map` on the keyed stream to return back >>>>> the >>>>>>> input data, so that the window operator receives the exactly same >>>>> data. The >>>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The >>>>> result is >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: >>>>>>> ```java >>>>>>> DataStream<Trip> trips = >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>>>> @Override >>>>>>> public long extractTimestamp(Trip trip) { >>>>>>> return trip.endTime.getTime(); >>>>>>> } >>>>>>> }); >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>>>> Time.days(1)); >>>>>>> ``` >>>>>>> >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: >>>>>>> ```java >>>>>>> DataStream<Trip> trips = env.addSource(consumer); >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); >>>>>>> DataStream<Trip> featurizedUserTrips = >>>>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>>>> @Override >>>>>>> public long extractTimestamp(Trip trip) { >>>>>>> return trip.endTime.getTime(); >>>>>>> } >>>>>>> }); >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>>>> Time.days(1)); >>>>>>> ``` >>>>>>> >>>>>>> It feels a bug to me, but I want to confirm it before I file the bug >>>>>>> report. >>>>>>> >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> Could you check the watermark of the window operator? One possible >>>>>>> situation would be some of the keys are not getting enough inputs, so >>>>> their >>>>>>> watermarks remain below the window end time and hold the window >>>>> operator >>>>>>> watermark back. IMO, it’s a good practice to assign watermark earlier >>>>> in >>>>>>> the data pipeline. >>>>>>>> Best, >>>>>>>> Paul Lam >>>>>>>> >>>>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: >>>>>>>>> >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: >>>>>>>>> ```java >>>>>>>>> DataStream<Trip> trips = >>>>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>>>>>> @Override >>>>>>>>> public long extractTimestamp(Trip trip) { >>>>>>>>> return trip.endTime.getTime(); >>>>>>>>> } >>>>>>>>> }); >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> >>>>> trip.userId); >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = >>>>> userTrips.process(new >>>>>>> Featurization()); >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>>>>>> Time.days(1)); >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> But not after `keyBy` and `process`: >>>>>>>>> ```java >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> >>>>> trip.userId); >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = >>>>>>>>> userTrips.process(new >>>>>>> Featurization()).assignTimestampsAndWatermarks(new >>>>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { >>>>>>>>> @Override >>>>>>>>> public long extractTimestamp(FeaturizedTrip trip) { >>>>>>>>> return trip.endTime.getTime(); >>>>>>>>> } >>>>>>>>> }); >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>>>>>> Time.days(1)); >>>>>>>>> ``` >>>>>>>>> Windows are never triggered. >>>>>>>>> >>>>>>>>> Is it a bug or expected behavior? If the latter, where is it >>>>>>> documented? >> signature.asc (849 bytes) Download Attachment |
Thanks very much. It definitely explains the problem I'm seeing. However, something I need to confirm:
You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data flows through a specific key's stream, all key streams have the same watermarks? So time-wise, `window` behaves as if `keyBy` is not there at all? On 2019/04/26 06:34:10, Dawid Wysakowicz <[hidden email]> wrote: > Hi, > > Watermarks are meta events that travel independently of data events. > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > instances of trips have some data(this is my assumption) so Watermarks > can be generated. Afterwards even if some of the keyed partitions have > no data, Watermarks are broadcasted/forwarded anyway. In other words if > at some point Watermarks were generated for all partitions of a single > stage, they will be forwarded beyond this point. > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign > watermarks for an empty partition which produces no Watermarks at all > for this partition, therefore there is no progress beyond this point. > > I hope this clarifies it a bit. > > Best, > > Dawid > > On 25/04/2019 16:49, an0 wrote: > > If my understanding is correct, then why `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 and task 2, with task 2 idling, no matter whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether task 2 receives elements only depends on the key distribution, has nothing to do with timestamp assignment, right? > > > > /key 1 trips\ > > / \ > > (A) trips--> assignTimestampsAndWatermarks-->keyBy timeWindowAll > > \ idle / > > \key 2 trips/ > > > > /key 1 trips--> assignTimestampsAndWatermarks\ > > / \ > > (B) trips-->keyBy timeWindowAll > > \ idle / > > \key 2 trips--> assignTimestampsAndWatermarks/ > > > > How things are different between A and B from `timeWindowAll`'s perspective? > > > > BTW, thanks for the webinar link, I'll check it later. > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <[hidden email]> wrote: > >> Hi, > >> > >> Yes I think your explanation is correct. I can also recommend Seth's > >> webinar where he talks about debugging Watermarks[1] > >> > >> Best, > >> > >> Dawid > >> > >> [1] > >> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > >> > >> On 22/04/2019 22:55, an0 wrote: > >>> Thanks, I feel I'm getting closer to the truth. > >>> > >>> So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 tasks running after `keyBy` if even all elements have the same key so go to 1 down stream(say task 1)? And it is the other task(task 2) with no incoming data that caused the `timeWindowAll` stream unable to progress? Because both task 1 and task 2 are its input streams and one is idling so its event time cannot make progress? > >>> > >>> On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> wrote: > >>>> HI, > >>>> > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it > >>>> receives an element. > >>>> > >>>> For after Keyby: > >>>> Flink uses the HashCode of key and the parallelism of down stream to decide > >>>> which subtask would receive the element. This means if your key is always > >>>> same, all the sources will only send the elements to the same down stream > >>>> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. > >>>> > >>>> For before Keyby: > >>>> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would > >>>> be chained together, which means every > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. > >>>> > >>>> Best, > >>>> Guowei > >>>> > >>>> > >>>> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > >>>> > >>>>> Hi, > >>>>> > >>>>> First of all, thank you for the `shuffle()` tip. It works. However, I > >>>>> still don't understand why it doesn't work without calling `shuffle()`. > >>>>> > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? > >>>>> All the trips has keys and timestamps. As I said in my reply to Paul, I see > >>>>> the same watermarks being extracted. > >>>>> > >>>>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` > >>>>> matter? My understanding is any specific window for a specific key always > >>>>> receives the exactly same data, and the calling order of > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > >>>>> > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it always > >>>>> return the same key so that there is only 1 keyed stream and it is exactly > >>>>> the same as the original unkeyed stream. It still doesn't trigger windows: > >>>>> ```java > >>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > >>>>> DataStream<Trip> featurizedUserTrips = > >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>> @Override > >>>>> public long extractTimestamp(Trip trip) { > >>>>> return trip.endTime.getTime(); > >>>>> } > >>>>> }); > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>> Time.days(1)); > >>>>> ``` > >>>>> > >>>>> It makes no sense to me. Please help me understand why it doesn't work. > >>>>> Thanks! > >>>>> > >>>>> On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> wrote: > >>>>>> Hi, > >>>>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > >>>>>> could receive the elements(trip). If that is the case > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element > >>>>>> would not send the WM. Since that the timeWindowAll operator could not be > >>>>>> triggered. > >>>>>> You could add a shuffle() before the assignTimestampsAndWatermarks in > >>>>> your > >>>>>> second case and check if the window is triggered. If it could be > >>>>> triggered > >>>>>> you could check the distribution of elements generated by the source. > >>>>>> > >>>>>> Best, > >>>>>> Guowei > >>>>>> > >>>>>> > >>>>>> [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: > >>>>>> > >>>>>>> I don't think it is the watermark. I see the same watermarks from the > >>>>> two > >>>>>>> versions of code. > >>>>>>> > >>>>>>> The processing on the keyed stream doesn't change event time at all. I > >>>>> can > >>>>>>> simply change my code to use `map` on the keyed stream to return back > >>>>> the > >>>>>>> input data, so that the window operator receives the exactly same > >>>>> data. The > >>>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The > >>>>> result is > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = > >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>>>> DataStream<Trip> featurizedUserTrips = > >>>>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> > >>>>>>> It feels a bug to me, but I want to confirm it before I file the bug > >>>>>>> report. > >>>>>>> > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> Could you check the watermark of the window operator? One possible > >>>>>>> situation would be some of the keys are not getting enough inputs, so > >>>>> their > >>>>>>> watermarks remain below the window end time and hold the window > >>>>> operator > >>>>>>> watermark back. IMO, it’s a good practice to assign watermark earlier > >>>>> in > >>>>>>> the data pipeline. > >>>>>>>> Best, > >>>>>>>> Paul Lam > >>>>>>>> > >>>>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: > >>>>>>>>> > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>>>>>> ```java > >>>>>>>>> DataStream<Trip> trips = > >>>>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>>>> @Override > >>>>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>>>> return trip.endTime.getTime(); > >>>>>>>>> } > >>>>>>>>> }); > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>>>> trip.userId); > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>>>> userTrips.process(new > >>>>>>> Featurization()); > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>>>> Time.days(1)); > >>>>>>>>> ``` > >>>>>>>>> > >>>>>>>>> But not after `keyBy` and `process`: > >>>>>>>>> ```java > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>>>> trip.userId); > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>>>>>>>> userTrips.process(new > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > >>>>>>>>> @Override > >>>>>>>>> public long extractTimestamp(FeaturizedTrip trip) { > >>>>>>>>> return trip.endTime.getTime(); > >>>>>>>>> } > >>>>>>>>> }); > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>>>> Time.days(1)); > >>>>>>>>> ``` > >>>>>>>>> Windows are never triggered. > >>>>>>>>> > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is it > >>>>>>> documented? > >> > > |
Hi An0: Here is my understanding - each operator has the watermark which is the lowest of all it's input streams. When the watermark for an operator is updated, the lowest one becomes the new watermark for that operator and is fowarded to the output streams for that operator. So, if one of the stream's watermark is the not updated, it might keep the operator's watermark to move forward, thereby affecting the watermark emitted to the following operators. Here is the description for how watermarks work - https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams Hope that helps.
On Monday, April 29, 2019, 2:06:12 PM EDT, an0 <[hidden email]> wrote:
Thanks very much. It definitely explains the problem I'm seeing. However, something I need to confirm: You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data flows through a specific key's stream, all key streams have the same watermarks? So time-wise, `window` behaves as if `keyBy` is not there at all? On 2019/04/26 06:34:10, Dawid Wysakowicz <[hidden email]> wrote: > Hi, > > Watermarks are meta events that travel independently of data events. > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > instances of trips have some data(this is my assumption) so Watermarks > can be generated. Afterwards even if some of the keyed partitions have > no data, Watermarks are broadcasted/forwarded anyway. In other words if > at some point Watermarks were generated for all partitions of a single > stage, they will be forwarded beyond this point. > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign > watermarks for an empty partition which produces no Watermarks at all > for this partition, therefore there is no progress beyond this point. > > I hope this clarifies it a bit. > > Best, > > Dawid > > On 25/04/2019 16:49, an0 wrote: > > If my understanding is correct, then why `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 and task 2, with task 2 idling, no matter whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether task 2 receives elements only depends on the key distribution, has nothing to do with timestamp assignment, right? > > > > /key 1 trips\ > > / \ > > (A) trips--> assignTimestampsAndWatermarks-->keyBy timeWindowAll > > \ idle / > > \key 2 trips/ > > > > /key 1 trips--> assignTimestampsAndWatermarks\ > > / \ > > (B) trips-->keyBy timeWindowAll > > \ idle / > > \key 2 trips--> assignTimestampsAndWatermarks/ > > > > How things are different between A and B from `timeWindowAll`'s perspective? > > > > BTW, thanks for the webinar link, I'll check it later. > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <[hidden email]> wrote: > >> Hi, > >> > >> Yes I think your explanation is correct. I can also recommend Seth's > >> webinar where he talks about debugging Watermarks[1] > >> > >> Best, > >> > >> Dawid > >> > >> [1] > >> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > >> > >> On 22/04/2019 22:55, an0 wrote: > >>> Thanks, I feel I'm getting closer to the truth. > >>> > >>> So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 tasks running after `keyBy` if even all elements have the same key so go to 1 down stream(say task 1)? And it is the other task(task 2) with no incoming data that caused the `timeWindowAll` stream unable to progress? Because both task 1 and task 2 are its input streams and one is idling so its event time cannot make progress? > >>> > >>> On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> wrote: > >>>> HI, > >>>> > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it > >>>> receives an element. > >>>> > >>>> For after Keyby: > >>>> Flink uses the HashCode of key and the parallelism of down stream to decide > >>>> which subtask would receive the element. This means if your key is always > >>>> same, all the sources will only send the elements to the same down stream > >>>> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. > >>>> > >>>> For before Keyby: > >>>> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would > >>>> be chained together, which means every > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. > >>>> > >>>> Best, > >>>> Guowei > >>>> > >>>> > >>>> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > >>>> > >>>>> Hi, > >>>>> > >>>>> First of all, thank you for the `shuffle()` tip. It works. However, I > >>>>> still don't understand why it doesn't work without calling `shuffle()`. > >>>>> > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? > >>>>> All the trips has keys and timestamps. As I said in my reply to Paul, I see > >>>>> the same watermarks being extracted. > >>>>> > >>>>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` > >>>>> matter? My understanding is any specific window for a specific key always > >>>>> receives the exactly same data, and the calling order of > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > >>>>> > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it always > >>>>> return the same key so that there is only 1 keyed stream and it is exactly > >>>>> the same as the original unkeyed stream. It still doesn't trigger windows: > >>>>> ```java > >>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > >>>>> DataStream<Trip> featurizedUserTrips = > >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>> @Override > >>>>> public long extractTimestamp(Trip trip) { > >>>>> return trip.endTime.getTime(); > >>>>> } > >>>>> }); > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>> Time.days(1)); > >>>>> ``` > >>>>> > >>>>> It makes no sense to me. Please help me understand why it doesn't work. > >>>>> Thanks! > >>>>> > >>>>> On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> wrote: > >>>>>> Hi, > >>>>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > >>>>>> could receive the elements(trip). If that is the case > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element > >>>>>> would not send the WM. Since that the timeWindowAll operator could not be > >>>>>> triggered. > >>>>>> You could add a shuffle() before the assignTimestampsAndWatermarks in > >>>>> your > >>>>>> second case and check if the window is triggered. If it could be > >>>>> triggered > >>>>>> you could check the distribution of elements generated by the source. > >>>>>> > >>>>>> Best, > >>>>>> Guowei > >>>>>> > >>>>>> > >>>>>> [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: > >>>>>> > >>>>>>> I don't think it is the watermark. I see the same watermarks from the > >>>>> two > >>>>>>> versions of code. > >>>>>>> > >>>>>>> The processing on the keyed stream doesn't change event time at all. I > >>>>> can > >>>>>>> simply change my code to use `map` on the keyed stream to return back > >>>>> the > >>>>>>> input data, so that the window operator receives the exactly same > >>>>> data. The > >>>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The > >>>>> result is > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = > >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>>>> DataStream<Trip> featurizedUserTrips = > >>>>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> > >>>>>>> It feels a bug to me, but I want to confirm it before I file the bug > >>>>>>> report. > >>>>>>> > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> Could you check the watermark of the window operator? One possible > >>>>>>> situation would be some of the keys are not getting enough inputs, so > >>>>> their > >>>>>>> watermarks remain below the window end time and hold the window > >>>>> operator > >>>>>>> watermark back. IMO, it’s a good practice to assign watermark earlier > >>>>> in > >>>>>>> the data pipeline. > >>>>>>>> Best, > >>>>>>>> Paul Lam > >>>>>>>> > >>>>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: > >>>>>>>>> > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>>>>>> ```java > >>>>>>>>> DataStream<Trip> trips = > >>>>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>>>> @Override > >>>>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>>>> return trip.endTime.getTime(); > >>>>>>>>> } > >>>>>>>>> }); > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>>>> trip.userId); > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>>>> userTrips.process(new > >>>>>>> Featurization()); > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>>>> Time.days(1)); > >>>>>>>>> ``` > >>>>>>>>> > >>>>>>>>> But not after `keyBy` and `process`: > >>>>>>>>> ```java > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>>>> trip.userId); > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>>>>>>>> userTrips.process(new > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > >>>>>>>>> @Override > >>>>>>>>> public long extractTimestamp(FeaturizedTrip trip) { > >>>>>>>>> return trip.endTime.getTime(); > >>>>>>>>> } > >>>>>>>>> }); > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>>>> Time.days(1)); > >>>>>>>>> ``` > >>>>>>>>> Windows are never triggered. > >>>>>>>>> > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is it > >>>>>>> documented? > >> > > |
In reply to this post by an00na@gmail.com
An operator task broadcasts its current watermark to all downstream tasks that might receive its records. If you have an the following code: DataStream<X> a = ... a.map(A).map(B).keyBy(....).window(C) and execute this with parallelism 2, your plan looks like this A.1 -- B.1 --\--/-- C.1 X A.2 -- B.2 --/--\-- C.2 A.1 will propagate its watermarks to B.1 because only B.1 will receive its output events. However, B.1 will propagate its watermarks to C.1 and C.2 because the output of B.1 is partitioned and all C tasks might receive output events from B.1. Best, Fabian Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <[hidden email]>: Thanks very much. It definitely explains the problem I'm seeing. However, something I need to confirm: |
This explanation is exactly what I'm looking for, thanks! Is such an important rule documented anywhere in the official document?
On 2019/04/30 08:47:29, Fabian Hueske <[hidden email]> wrote: > An operator task broadcasts its current watermark to all downstream tasks > that might receive its records. > If you have an the following code: > > DataStream<X> a = ... > a.map(A).map(B).keyBy(....).window(C) > > and execute this with parallelism 2, your plan looks like this > > A.1 -- B.1 --\--/-- C.1 > X > A.2 -- B.2 --/--\-- C.2 > > A.1 will propagate its watermarks to B.1 because only B.1 will receive its > output events. > However, B.1 will propagate its watermarks to C.1 and C.2 because the > output of B.1 is partitioned and all C tasks might receive output events > from B.1. > > Best, Fabian > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <[hidden email]>: > > > Thanks very much. It definitely explains the problem I'm seeing. However, > > something I need to confirm: > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data > > flows through a specific key's stream, all key streams have the same > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there at > > all? > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz <[hidden email]> wrote: > > > Hi, > > > > > > Watermarks are meta events that travel independently of data events. > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > > > instances of trips have some data(this is my assumption) so Watermarks > > > can be generated. Afterwards even if some of the keyed partitions have > > > no data, Watermarks are broadcasted/forwarded anyway. In other words if > > > at some point Watermarks were generated for all partitions of a single > > > stage, they will be forwarded beyond this point. > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign > > > watermarks for an empty partition which produces no Watermarks at all > > > for this partition, therefore there is no progress beyond this point. > > > > > > I hope this clarifies it a bit. > > > > > > Best, > > > > > > Dawid > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > If my understanding is correct, then why > > `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll` > > stream's input streams are task 1 and task 2, with task 2 idling, no matter > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because > > whether task 2 receives elements only depends on the key distribution, has > > nothing to do with timestamp assignment, right? > > > > > > > > > > /key 1 trips\ > > > > > > / \ > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > timeWindowAll > > > > > > \ idle / > > > > > > \key 2 trips/ > > > > > > > > /key 1 trips--> > > assignTimestampsAndWatermarks\ > > > > / > > \ > > > > (B) trips-->keyBy > > timeWindowAll > > > > \ idle > > / > > > > \key 2 trips--> > > assignTimestampsAndWatermarks/ > > > > > > > > How things are different between A and B from `timeWindowAll`'s > > perspective? > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <[hidden email]> > > wrote: > > > >> Hi, > > > >> > > > >> Yes I think your explanation is correct. I can also recommend Seth's > > > >> webinar where he talks about debugging Watermarks[1] > > > >> > > > >> Best, > > > >> > > > >> Dawid > > > >> > > > >> [1] > > > >> > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > > >> > > > >> On 22/04/2019 22:55, an0 wrote: > > > >>> Thanks, I feel I'm getting closer to the truth. > > > >>> > > > >>> So parallelism is the cause? Say my parallelism is 2. Does that mean > > I get 2 tasks running after `keyBy` if even all elements have the same key > > so go to 1 down stream(say task 1)? And it is the other task(task 2) with > > no incoming data that caused the `timeWindowAll` stream unable to progress? > > Because both task 1 and task 2 are its input streams and one is idling so > > its event time cannot make progress? > > > >>> > > > >>> On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> wrote: > > > >>>> HI, > > > >>>> > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least > > after it > > > >>>> receives an element. > > > >>>> > > > >>>> For after Keyby: > > > >>>> Flink uses the HashCode of key and the parallelism of down stream > > to decide > > > >>>> which subtask would receive the element. This means if your key is > > always > > > >>>> same, all the sources will only send the elements to the same down > > stream > > > >>>> task, for example only no. 3 > > BoundedOutOfOrdernessTimestampExtractor. > > > >>>> > > > >>>> For before Keyby: > > > >>>> In your case, the Source and > > BoundedOutOfOrdernessTimestampExtractors would > > > >>>> be chained together, which means every > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. > > > >>>> > > > >>>> Best, > > > >>>> Guowei > > > >>>> > > > >>>> > > > >>>> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > > > >>>> > > > >>>>> Hi, > > > >>>>> > > > >>>>> First of all, thank you for the `shuffle()` tip. It works. > > However, I > > > >>>>> still don't understand why it doesn't work without calling > > `shuffle()`. > > > >>>>> > > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive > > trips? > > > >>>>> All the trips has keys and timestamps. As I said in my reply to > > Paul, I see > > > >>>>> the same watermarks being extracted. > > > >>>>> > > > >>>>> How could calling `assignTimestampsAndWatermarks` before VS after > > `keyBy` > > > >>>>> matter? My understanding is any specific window for a specific key > > always > > > >>>>> receives the exactly same data, and the calling order of > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > > > >>>>> > > > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it > > always > > > >>>>> return the same key so that there is only 1 keyed stream and it is > > exactly > > > >>>>> the same as the original unkeyed stream. It still doesn't trigger > > windows: > > > >>>>> ```java > > > >>>>> DataStream<Trip> trips = env.addSource(consumer); > > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > > > >>>>> DataStream<Trip> featurizedUserTrips = > > > >>>>> userTrips.map(trip -> > > trip).assignTimestampsAndWatermarks(new > > > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > >>>>> @Override > > > >>>>> public long extractTimestamp(Trip trip) { > > > >>>>> return trip.endTime.getTime(); > > > >>>>> } > > > >>>>> }); > > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>> Time.days(1)); > > > >>>>> ``` > > > >>>>> > > > >>>>> It makes no sense to me. Please help me understand why it doesn't > > work. > > > >>>>> Thanks! > > > >>>>> > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> wrote: > > > >>>>>> Hi, > > > >>>>>> After keyby maybe only some of > > BoundedOutOfOrdernessTimestampExtractors > > > >>>>>> could receive the elements(trip). If that is the case > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive > > element > > > >>>>>> would not send the WM. Since that the timeWindowAll operator > > could not be > > > >>>>>> triggered. > > > >>>>>> You could add a shuffle() before the > > assignTimestampsAndWatermarks in > > > >>>>> your > > > >>>>>> second case and check if the window is triggered. If it could be > > > >>>>> triggered > > > >>>>>> you could check the distribution of elements generated by the > > source. > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Guowei > > > >>>>>> > > > >>>>>> > > > >>>>>> [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: > > > >>>>>> > > > >>>>>>> I don't think it is the watermark. I see the same watermarks > > from the > > > >>>>> two > > > >>>>>>> versions of code. > > > >>>>>>> > > > >>>>>>> The processing on the keyed stream doesn't change event time at > > all. I > > > >>>>> can > > > >>>>>>> simply change my code to use `map` on the keyed stream to return > > back > > > >>>>> the > > > >>>>>>> input data, so that the window operator receives the exactly same > > > >>>>> data. The > > > >>>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The > > > >>>>> result is > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: > > > >>>>>>> ```java > > > >>>>>>> DataStream<Trip> trips = > > > >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > >>>>>>> @Override > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > >>>>>>> return trip.endTime.getTime(); > > > >>>>>>> } > > > >>>>>>> }); > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > trip.userId); > > > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> > > trip); > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>>>> Time.days(1)); > > > >>>>>>> ``` > > > >>>>>>> > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > > >>>>>>> ```java > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > trip.userId); > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > >>>>>>> userTrips.map(trip -> > > trip).assignTimestampsAndWatermarks(new > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > >>>>>>> @Override > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > >>>>>>> return trip.endTime.getTime(); > > > >>>>>>> } > > > >>>>>>> }); > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>>>> Time.days(1)); > > > >>>>>>> ``` > > > >>>>>>> > > > >>>>>>> It feels a bug to me, but I want to confirm it before I file the > > bug > > > >>>>>>> report. > > > >>>>>>> > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[hidden email]> wrote: > > > >>>>>>>> Hi, > > > >>>>>>>> > > > >>>>>>>> Could you check the watermark of the window operator? One > > possible > > > >>>>>>> situation would be some of the keys are not getting enough > > inputs, so > > > >>>>> their > > > >>>>>>> watermarks remain below the window end time and hold the window > > > >>>>> operator > > > >>>>>>> watermark back. IMO, it’s a good practice to assign watermark > > earlier > > > >>>>> in > > > >>>>>>> the data pipeline. > > > >>>>>>>> Best, > > > >>>>>>>> Paul Lam > > > >>>>>>>> > > > >>>>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: > > > >>>>>>>>> > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > > > >>>>>>>>> ```java > > > >>>>>>>>> DataStream<Trip> trips = > > > >>>>>>>>> > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > >>>>>>>>> @Override > > > >>>>>>>>> public long extractTimestamp(Trip trip) { > > > >>>>>>>>> return trip.endTime.getTime(); > > > >>>>>>>>> } > > > >>>>>>>>> }); > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > >>>>> trip.userId); > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > >>>>> userTrips.process(new > > > >>>>>>> Featurization()); > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > windowedUserTrips = > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>>>>>> Time.days(1)); > > > >>>>>>>>> ``` > > > >>>>>>>>> > > > >>>>>>>>> But not after `keyBy` and `process`: > > > >>>>>>>>> ```java > > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > >>>>> trip.userId); > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > >>>>>>>>> userTrips.process(new > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > > > >>>>>>> > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > >>>>>>>>> @Override > > > >>>>>>>>> public long extractTimestamp(FeaturizedTrip trip) { > > > >>>>>>>>> return trip.endTime.getTime(); > > > >>>>>>>>> } > > > >>>>>>>>> }); > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > windowedUserTrips = > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>>>>>> Time.days(1)); > > > >>>>>>>>> ``` > > > >>>>>>>>> Windows are never triggered. > > > >>>>>>>>> > > > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is it > > > >>>>>>> documented? > > > >> > > > > > > > > > |
Hi, this should be covered here: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams Best, Fabian Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <[hidden email]>: This explanation is exactly what I'm looking for, thanks! Is such an important rule documented anywhere in the official document? |
Thanks, but it does't seem covering this rule:
--- Quote Watermarks are generated at, or directly after, source functions. Each parallel subtask of a source function usually generates its watermarks independently. These watermarks define the event time at that particular parallel source. As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators. Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…) or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator. --- End Quote The most relevant part, I believe, is this: "Some operators consume multiple input streams…operators following a keyBy(…) function. Such an operator’s current event time is the minimum of its input streams’ event times." But the wording of "current event time is the minimum of its input streams’ event times" actually implies that the input streams(produced by keyBy) have different watermarks, the exactly opposite of what you just explained. On 2019/05/03 07:32:07, Fabian Hueske <[hidden email]> wrote: > Hi, > > this should be covered here: > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams > > Best, Fabian > > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <[hidden email]>: > > > This explanation is exactly what I'm looking for, thanks! Is such an > > important rule documented anywhere in the official document? > > > > On 2019/04/30 08:47:29, Fabian Hueske <[hidden email]> wrote: > > > An operator task broadcasts its current watermark to all downstream tasks > > > that might receive its records. > > > If you have an the following code: > > > > > > DataStream<X> a = ... > > > a.map(A).map(B).keyBy(....).window(C) > > > > > > and execute this with parallelism 2, your plan looks like this > > > > > > A.1 -- B.1 --\--/-- C.1 > > > X > > > A.2 -- B.2 --/--\-- C.2 > > > > > > A.1 will propagate its watermarks to B.1 because only B.1 will receive > > its > > > output events. > > > However, B.1 will propagate its watermarks to C.1 and C.2 because the > > > output of B.1 is partitioned and all C tasks might receive output events > > > from B.1. > > > > > > Best, Fabian > > > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <[hidden email]>: > > > > > > > Thanks very much. It definitely explains the problem I'm seeing. > > However, > > > > something I need to confirm: > > > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in > > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data > > > > flows through a specific key's stream, all key streams have the same > > > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there > > at > > > > all? > > > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz <[hidden email]> > > wrote: > > > > > Hi, > > > > > > > > > > Watermarks are meta events that travel independently of data events. > > > > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > > > > > instances of trips have some data(this is my assumption) so > > Watermarks > > > > > can be generated. Afterwards even if some of the keyed partitions > > have > > > > > no data, Watermarks are broadcasted/forwarded anyway. In other words > > if > > > > > at some point Watermarks were generated for all partitions of a > > single > > > > > stage, they will be forwarded beyond this point. > > > > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to > > assign > > > > > watermarks for an empty partition which produces no Watermarks at all > > > > > for this partition, therefore there is no progress beyond this point. > > > > > > > > > > I hope this clarifies it a bit. > > > > > > > > > > Best, > > > > > > > > > > Dawid > > > > > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > > > If my understanding is correct, then why > > > > `assignTimestampsAndWatermarks` before `keyBy` works? The > > `timeWindowAll` > > > > stream's input streams are task 1 and task 2, with task 2 idling, no > > matter > > > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`, > > because > > > > whether task 2 receives elements only depends on the key distribution, > > has > > > > nothing to do with timestamp assignment, right? > > > > > > > > > > > > > > > > /key 1 trips\ > > > > > > > > > > / \ > > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > > > timeWindowAll > > > > > > > > > > \ idle / > > > > > > > > > > \key 2 trips/ > > > > > > > > > > > > /key 1 trips--> > > > > assignTimestampsAndWatermarks\ > > > > > > / > > > > \ > > > > > > (B) trips-->keyBy > > > > timeWindowAll > > > > > > \ idle > > > > / > > > > > > \key 2 trips--> > > > > assignTimestampsAndWatermarks/ > > > > > > > > > > > > How things are different between A and B from `timeWindowAll`'s > > > > perspective? > > > > > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <[hidden email]> > > > > wrote: > > > > > >> Hi, > > > > > >> > > > > > >> Yes I think your explanation is correct. I can also recommend > > Seth's > > > > > >> webinar where he talks about debugging Watermarks[1] > > > > > >> > > > > > >> Best, > > > > > >> > > > > > >> Dawid > > > > > >> > > > > > >> [1] > > > > > >> > > > > > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > > > > >> > > > > > >> On 22/04/2019 22:55, an0 wrote: > > > > > >>> Thanks, I feel I'm getting closer to the truth. > > > > > >>> > > > > > >>> So parallelism is the cause? Say my parallelism is 2. Does that > > mean > > > > I get 2 tasks running after `keyBy` if even all elements have the same > > key > > > > so go to 1 down stream(say task 1)? And it is the other task(task 2) > > with > > > > no incoming data that caused the `timeWindowAll` stream unable to > > progress? > > > > Because both task 1 and task 2 are its input streams and one is idling > > so > > > > its event time cannot make progress? > > > > > >>> > > > > > >>> On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> wrote: > > > > > >>>> HI, > > > > > >>>> > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least > > > > after it > > > > > >>>> receives an element. > > > > > >>>> > > > > > >>>> For after Keyby: > > > > > >>>> Flink uses the HashCode of key and the parallelism of down > > stream > > > > to decide > > > > > >>>> which subtask would receive the element. This means if your key > > is > > > > always > > > > > >>>> same, all the sources will only send the elements to the same > > down > > > > stream > > > > > >>>> task, for example only no. 3 > > > > BoundedOutOfOrdernessTimestampExtractor. > > > > > >>>> > > > > > >>>> For before Keyby: > > > > > >>>> In your case, the Source and > > > > BoundedOutOfOrdernessTimestampExtractors would > > > > > >>>> be chained together, which means every > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. > > > > > >>>> > > > > > >>>> Best, > > > > > >>>> Guowei > > > > > >>>> > > > > > >>>> > > > > > >>>> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > > > > > >>>> > > > > > >>>>> Hi, > > > > > >>>>> > > > > > >>>>> First of all, thank you for the `shuffle()` tip. It works. > > > > However, I > > > > > >>>>> still don't understand why it doesn't work without calling > > > > `shuffle()`. > > > > > >>>>> > > > > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors > > receive > > > > trips? > > > > > >>>>> All the trips has keys and timestamps. As I said in my reply to > > > > Paul, I see > > > > > >>>>> the same watermarks being extracted. > > > > > >>>>> > > > > > >>>>> How could calling `assignTimestampsAndWatermarks` before VS > > after > > > > `keyBy` > > > > > >>>>> matter? My understanding is any specific window for a specific > > key > > > > always > > > > > >>>>> receives the exactly same data, and the calling order of > > > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect > > that. > > > > > >>>>> > > > > > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it > > > > always > > > > > >>>>> return the same key so that there is only 1 keyed stream and > > it is > > > > exactly > > > > > >>>>> the same as the original unkeyed stream. It still doesn't > > trigger > > > > windows: > > > > > >>>>> ```java > > > > > >>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > > > > > >>>>> DataStream<Trip> featurizedUserTrips = > > > > > >>>>> userTrips.map(trip -> > > > > trip).assignTimestampsAndWatermarks(new > > > > > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > >>>>> @Override > > > > > >>>>> public long extractTimestamp(Trip trip) { > > > > > >>>>> return trip.endTime.getTime(); > > > > > >>>>> } > > > > > >>>>> }); > > > > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>> Time.days(1)); > > > > > >>>>> ``` > > > > > >>>>> > > > > > >>>>> It makes no sense to me. Please help me understand why it > > doesn't > > > > work. > > > > > >>>>> Thanks! > > > > > >>>>> > > > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> > > wrote: > > > > > >>>>>> Hi, > > > > > >>>>>> After keyby maybe only some of > > > > BoundedOutOfOrdernessTimestampExtractors > > > > > >>>>>> could receive the elements(trip). If that is the case > > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not > > receive > > > > element > > > > > >>>>>> would not send the WM. Since that the timeWindowAll operator > > > > could not be > > > > > >>>>>> triggered. > > > > > >>>>>> You could add a shuffle() before the > > > > assignTimestampsAndWatermarks in > > > > > >>>>> your > > > > > >>>>>> second case and check if the window is triggered. If it > > could be > > > > > >>>>> triggered > > > > > >>>>>> you could check the distribution of elements generated by the > > > > source. > > > > > >>>>>> > > > > > >>>>>> Best, > > > > > >>>>>> Guowei > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> [hidden email] <[hidden email]> 于2019年4月19日周五 上午4:10写道: > > > > > >>>>>> > > > > > >>>>>>> I don't think it is the watermark. I see the same watermarks > > > > from the > > > > > >>>>> two > > > > > >>>>>>> versions of code. > > > > > >>>>>>> > > > > > >>>>>>> The processing on the keyed stream doesn't change event time > > at > > > > all. I > > > > > >>>>> can > > > > > >>>>>>> simply change my code to use `map` on the keyed stream to > > return > > > > back > > > > > >>>>> the > > > > > >>>>>>> input data, so that the window operator receives the exactly > > same > > > > > >>>>> data. The > > > > > >>>>>>> only difference is when I do > > `assignTimestampsAndWatermarks`. The > > > > > >>>>> result is > > > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` > > works: > > > > > >>>>>>> ```java > > > > > >>>>>>> DataStream<Trip> trips = > > > > > >>>>>>> > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > >>>>>>> @Override > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > >>>>>>> } > > > > > >>>>>>> }); > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > trip.userId); > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> > > > > trip); > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>>>> Time.days(1)); > > > > > >>>>>>> ``` > > > > > >>>>>>> > > > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > > > > >>>>>>> ```java > > > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > trip.userId); > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > > > >>>>>>> userTrips.map(trip -> > > > > trip).assignTimestampsAndWatermarks(new > > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > >>>>>>> @Override > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > >>>>>>> } > > > > > >>>>>>> }); > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>>>> Time.days(1)); > > > > > >>>>>>> ``` > > > > > >>>>>>> > > > > > >>>>>>> It feels a bug to me, but I want to confirm it before I file > > the > > > > bug > > > > > >>>>>>> report. > > > > > >>>>>>> > > > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[hidden email]> > > wrote: > > > > > >>>>>>>> Hi, > > > > > >>>>>>>> > > > > > >>>>>>>> Could you check the watermark of the window operator? One > > > > possible > > > > > >>>>>>> situation would be some of the keys are not getting enough > > > > inputs, so > > > > > >>>>> their > > > > > >>>>>>> watermarks remain below the window end time and hold the > > window > > > > > >>>>> operator > > > > > >>>>>>> watermark back. IMO, it’s a good practice to assign watermark > > > > earlier > > > > > >>>>> in > > > > > >>>>>>> the data pipeline. > > > > > >>>>>>>> Best, > > > > > >>>>>>>> Paul Lam > > > > > >>>>>>>> > > > > > >>>>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: > > > > > >>>>>>>>> > > > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > > > > > >>>>>>>>> ```java > > > > > >>>>>>>>> DataStream<Trip> trips = > > > > > >>>>>>>>> > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > >>>>>>>>> @Override > > > > > >>>>>>>>> public long extractTimestamp(Trip trip) { > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > >>>>>>>>> } > > > > > >>>>>>>>> }); > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > > >>>>> trip.userId); > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > >>>>> userTrips.process(new > > > > > >>>>>>> Featurization()); > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > windowedUserTrips = > > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>>>>>> Time.days(1)); > > > > > >>>>>>>>> ``` > > > > > >>>>>>>>> > > > > > >>>>>>>>> But not after `keyBy` and `process`: > > > > > >>>>>>>>> ```java > > > > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > > >>>>> trip.userId); > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > >>>>>>>>> userTrips.process(new > > > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > > > > > >>>>>>> > > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > > >>>>>>>>> @Override > > > > > >>>>>>>>> public long extractTimestamp(FeaturizedTrip > > trip) { > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > >>>>>>>>> } > > > > > >>>>>>>>> }); > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > windowedUserTrips = > > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>>>>>> Time.days(1)); > > > > > >>>>>>>>> ``` > > > > > >>>>>>>>> Windows are never triggered. > > > > > >>>>>>>>> > > > > > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is > > it > > > > > >>>>>>> documented? > > > > > >> > > > > > > > > > > > > > > > > > > > > |
Hi, Please find my response below. Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 <[hidden email]>: Thanks, but it does't seem covering this rule: IMO, the description in the documentation is correct, but looks at the issue from a different angle. An operator task typically has many input from which it receives records. Depending on the number of input operators (one ore more) and the connection between the operator and its input operators (forward, partition, broadcast), an operator task might have a connection to one, some, or all tasks of its input operators. Each input task can send a different watermark, but each task will also send the same watermark to all its output tasks. So, this is a matter of distinguishing receiving (different) watermarks and emitting (the same) watermarks. Best, Fabian On 2019/05/03 07:32:07, Fabian Hueske <[hidden email]> wrote: |
You are right, thanks. But something is still not totally clear to me. I'll reuse your diagram with a little modification:
DataStream<X> a = ... a.map(A).map(B).keyBy(....).timeWindow(C) and execute this with parallelism 2. However, keyBy only generates one single key value, and assume they all go to C.1. Does the data flow look like this? A.1 -- B.1 -----/-- C.1 / A.2 -- B.2 --/ C.2 Will the lack of data into C.2 prevent C.1's windows from firing? Will the location of assignTimestampsAndWatermarks(after a, after map(A), after map(B), after keyBy) matter for the firing of C.1's windows? By my understanding, the answers are "no" and "no". Correct? Now comes the *silly* question: does C.2 exist at all? Since there is only one key value, only one C instance is needed. I could see that C.2 as a physical instance may exist, but as a logical instance it shouldn't exist in the diagram because it is unused. I feel the answer to this silly question may be the most important in understanding my and(perhaps many others') misunderstanding of situations like this. If C.2 exists just because parallelism is set to 2, even though it is not logically needed, and it also serves as an input to the next operator if there is one, then the mystery is completely solved for me. Use a concrete example: DataStream<X> a = ... a.map(A).map(B).keyBy(....).assignTimestampsAndWatermarks(C).timeWindowAll(D) A.1 -- B.1 -----/-- C.1 --\ / D A.2 -- B.2 --/ C.2 --/ D's watermark can not progress because C.2's watermark can not progress, because C.2 doesn't have any input data, even though C.2 is not logically needed but it does exist and it ruins everything :p. Is this understanding correct? On 2019/05/09 10:01:44, Fabian Hueske <[hidden email]> wrote: > Hi, > > Please find my response below. > > Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 <[hidden email]>: > > > Thanks, but it does't seem covering this rule: > > --- Quote > > Watermarks are generated at, or directly after, source functions. Each > > parallel subtask of a source function usually generates its watermarks > > independently. These watermarks define the event time at that particular > > parallel source. > > > > As the watermarks flow through the streaming program, they advance the > > event time at the operators where they arrive. Whenever an operator > > advances its event time, it generates a new watermark downstream for its > > successor operators. > > > > Some operators consume multiple input streams; a union, for example, or > > operators following a keyBy(…) or partition(…) function. Such an operator’s > > current event time is the minimum of its input streams’ event times. As its > > input streams update their event times, so does the operator. > > --- End Quote > > > > The most relevant part, I believe, is this: > > "Some operators consume multiple input streams…operators following a > > keyBy(…) function. Such an operator’s current event time is the minimum of > > its input streams’ event times." > > > > But the wording of "current event time is the minimum of its input > > streams’ event times" actually implies that the input streams(produced by > > keyBy) have different watermarks, the exactly opposite of what you just > > explained. > > > > > IMO, the description in the documentation is correct, but looks at the > issue from a different angle. > An operator task typically has many input from which it receives records. > Depending on the number of input operators (one ore more) and the > connection between the operator and its input operators (forward, > partition, broadcast), an operator task might have a connection to one, > some, or all tasks of its input operators. Each input task can send a > different watermark, but each task will also send the same watermark to all > its output tasks. > > So, this is a matter of distinguishing receiving (different) watermarks and > emitting (the same) watermarks. > > Best, Fabian > > > > On 2019/05/03 07:32:07, Fabian Hueske <[hidden email]> wrote: > > > Hi, > > > > > > this should be covered here: > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams > > > > > > Best, Fabian > > > > > > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <[hidden email]>: > > > > > > > This explanation is exactly what I'm looking for, thanks! Is such an > > > > important rule documented anywhere in the official document? > > > > > > > > On 2019/04/30 08:47:29, Fabian Hueske <[hidden email]> wrote: > > > > > An operator task broadcasts its current watermark to all downstream > > tasks > > > > > that might receive its records. > > > > > If you have an the following code: > > > > > > > > > > DataStream<X> a = ... > > > > > a.map(A).map(B).keyBy(....).window(C) > > > > > > > > > > and execute this with parallelism 2, your plan looks like this > > > > > > > > > > A.1 -- B.1 --\--/-- C.1 > > > > > X > > > > > A.2 -- B.2 --/--\-- C.2 > > > > > > > > > > A.1 will propagate its watermarks to B.1 because only B.1 will > > receive > > > > its > > > > > output events. > > > > > However, B.1 will propagate its watermarks to C.1 and C.2 because the > > > > > output of B.1 is partitioned and all C tasks might receive output > > events > > > > > from B.1. > > > > > > > > > > Best, Fabian > > > > > > > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <[hidden email]>: > > > > > > > > > > > Thanks very much. It definitely explains the problem I'm seeing. > > > > However, > > > > > > something I need to confirm: > > > > > > You say "Watermarks are broadcasted/forwarded anyway." Do you > > mean, in > > > > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what > > data > > > > > > flows through a specific key's stream, all key streams have the > > same > > > > > > watermarks? So time-wise, `window` behaves as if `keyBy` is not > > there > > > > at > > > > > > all? > > > > > > > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz <[hidden email]> > > > > wrote: > > > > > > > Hi, > > > > > > > > > > > > > > Watermarks are meta events that travel independently of data > > events. > > > > > > > > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all > > parallel > > > > > > > instances of trips have some data(this is my assumption) so > > > > Watermarks > > > > > > > can be generated. Afterwards even if some of the keyed partitions > > > > have > > > > > > > no data, Watermarks are broadcasted/forwarded anyway. In other > > words > > > > if > > > > > > > at some point Watermarks were generated for all partitions of a > > > > single > > > > > > > stage, they will be forwarded beyond this point. > > > > > > > > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to > > > > assign > > > > > > > watermarks for an empty partition which produces no Watermarks > > at all > > > > > > > for this partition, therefore there is no progress beyond this > > point. > > > > > > > > > > > > > > I hope this clarifies it a bit. > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > > > > > If my understanding is correct, then why > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works? The > > > > `timeWindowAll` > > > > > > stream's input streams are task 1 and task 2, with task 2 idling, > > no > > > > matter > > > > > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`, > > > > because > > > > > > whether task 2 receives elements only depends on the key > > distribution, > > > > has > > > > > > nothing to do with timestamp assignment, right? > > > > > > > > > > > > > > > > > > > > > > /key 1 trips\ > > > > > > > > > > > > > > / \ > > > > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > > > > > timeWindowAll > > > > > > > > > > > > > > \ idle / > > > > > > > > > > > > > > \key 2 trips/ > > > > > > > > > > > > > > > > /key 1 trips--> > > > > > > assignTimestampsAndWatermarks\ > > > > > > > > / > > > > > > \ > > > > > > > > (B) trips-->keyBy > > > > > > timeWindowAll > > > > > > > > \ idle > > > > > > / > > > > > > > > \key 2 trips--> > > > > > > assignTimestampsAndWatermarks/ > > > > > > > > > > > > > > > > How things are different between A and B from `timeWindowAll`'s > > > > > > perspective? > > > > > > > > > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz < > > [hidden email]> > > > > > > wrote: > > > > > > > >> Hi, > > > > > > > >> > > > > > > > >> Yes I think your explanation is correct. I can also recommend > > > > Seth's > > > > > > > >> webinar where he talks about debugging Watermarks[1] > > > > > > > >> > > > > > > > >> Best, > > > > > > > >> > > > > > > > >> Dawid > > > > > > > >> > > > > > > > >> [1] > > > > > > > >> > > > > > > > > > > > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > > > > > > >> > > > > > > > >> On 22/04/2019 22:55, an0 wrote: > > > > > > > >>> Thanks, I feel I'm getting closer to the truth. > > > > > > > >>> > > > > > > > >>> So parallelism is the cause? Say my parallelism is 2. Does > > that > > > > mean > > > > > > I get 2 tasks running after `keyBy` if even all elements have the > > same > > > > key > > > > > > so go to 1 down stream(say task 1)? And it is the other task(task > > 2) > > > > with > > > > > > no incoming data that caused the `timeWindowAll` stream unable to > > > > progress? > > > > > > Because both task 1 and task 2 are its input streams and one is > > idling > > > > so > > > > > > its event time cannot make progress? > > > > > > > >>> > > > > > > > >>> On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> > > wrote: > > > > > > > >>>> HI, > > > > > > > >>>> > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at > > least > > > > > > after it > > > > > > > >>>> receives an element. > > > > > > > >>>> > > > > > > > >>>> For after Keyby: > > > > > > > >>>> Flink uses the HashCode of key and the parallelism of down > > > > stream > > > > > > to decide > > > > > > > >>>> which subtask would receive the element. This means if your > > key > > > > is > > > > > > always > > > > > > > >>>> same, all the sources will only send the elements to the > > same > > > > down > > > > > > stream > > > > > > > >>>> task, for example only no. 3 > > > > > > BoundedOutOfOrdernessTimestampExtractor. > > > > > > > >>>> > > > > > > > >>>> For before Keyby: > > > > > > > >>>> In your case, the Source and > > > > > > BoundedOutOfOrdernessTimestampExtractors would > > > > > > > >>>> be chained together, which means every > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive > > elements. > > > > > > > >>>> > > > > > > > >>>> Best, > > > > > > > >>>> Guowei > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > > > > > > > >>>> > > > > > > > >>>>> Hi, > > > > > > > >>>>> > > > > > > > >>>>> First of all, thank you for the `shuffle()` tip. It works. > > > > > > However, I > > > > > > > >>>>> still don't understand why it doesn't work without calling > > > > > > `shuffle()`. > > > > > > > >>>>> > > > > > > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors > > > > receive > > > > > > trips? > > > > > > > >>>>> All the trips has keys and timestamps. As I said in my > > reply to > > > > > > Paul, I see > > > > > > > >>>>> the same watermarks being extracted. > > > > > > > >>>>> > > > > > > > >>>>> How could calling `assignTimestampsAndWatermarks` before VS > > > > after > > > > > > `keyBy` > > > > > > > >>>>> matter? My understanding is any specific window for a > > specific > > > > key > > > > > > always > > > > > > > >>>>> receives the exactly same data, and the calling order of > > > > > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't > > affect > > > > that. > > > > > > > >>>>> > > > > > > > >>>>> To make `keyBy` as irrelevant as possible, I tried letting > > it > > > > > > always > > > > > > > >>>>> return the same key so that there is only 1 keyed stream > > and > > > > it is > > > > > > exactly > > > > > > > >>>>> the same as the original unkeyed stream. It still doesn't > > > > trigger > > > > > > windows: > > > > > > > >>>>> ```java > > > > > > > >>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > 0L); > > > > > > > >>>>> DataStream<Trip> featurizedUserTrips = > > > > > > > >>>>> userTrips.map(trip -> > > > > > > trip).assignTimestampsAndWatermarks(new > > > > > > > >>>>> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > >>>>> @Override > > > > > > > >>>>> public long extractTimestamp(Trip trip) { > > > > > > > >>>>> return trip.endTime.getTime(); > > > > > > > >>>>> } > > > > > > > >>>>> }); > > > > > > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > >>>>> Time.days(1)); > > > > > > > >>>>> ``` > > > > > > > >>>>> > > > > > > > >>>>> It makes no sense to me. Please help me understand why it > > > > doesn't > > > > > > work. > > > > > > > >>>>> Thanks! > > > > > > > >>>>> > > > > > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <[hidden email]> > > > > wrote: > > > > > > > >>>>>> Hi, > > > > > > > >>>>>> After keyby maybe only some of > > > > > > BoundedOutOfOrdernessTimestampExtractors > > > > > > > >>>>>> could receive the elements(trip). If that is the case > > > > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not > > > > receive > > > > > > element > > > > > > > >>>>>> would not send the WM. Since that the timeWindowAll > > operator > > > > > > could not be > > > > > > > >>>>>> triggered. > > > > > > > >>>>>> You could add a shuffle() before the > > > > > > assignTimestampsAndWatermarks in > > > > > > > >>>>> your > > > > > > > >>>>>> second case and check if the window is triggered. If it > > > > could be > > > > > > > >>>>> triggered > > > > > > > >>>>>> you could check the distribution of elements generated by > > the > > > > > > source. > > > > > > > >>>>>> > > > > > > > >>>>>> Best, > > > > > > > >>>>>> Guowei > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> [hidden email] <[hidden email]> 于2019年4月19日周五 > > 上午4:10写道: > > > > > > > >>>>>> > > > > > > > >>>>>>> I don't think it is the watermark. I see the same > > watermarks > > > > > > from the > > > > > > > >>>>> two > > > > > > > >>>>>>> versions of code. > > > > > > > >>>>>>> > > > > > > > >>>>>>> The processing on the keyed stream doesn't change event > > time > > > > at > > > > > > all. I > > > > > > > >>>>> can > > > > > > > >>>>>>> simply change my code to use `map` on the keyed stream to > > > > return > > > > > > back > > > > > > > >>>>> the > > > > > > > >>>>>>> input data, so that the window operator receives the > > exactly > > > > same > > > > > > > >>>>> data. The > > > > > > > >>>>>>> only difference is when I do > > > > `assignTimestampsAndWatermarks`. The > > > > > > > >>>>> result is > > > > > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` > > > > works: > > > > > > > >>>>>>> ```java > > > > > > > >>>>>>> DataStream<Trip> trips = > > > > > > > >>>>>>> > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > > > >>>>>>> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > >>>>>>> @Override > > > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > > > >>>>>>> } > > > > > > > >>>>>>> }); > > > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > > > trip.userId); > > > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > userTrips.map(trip -> > > > > > > trip); > > > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > >>>>>>> Time.days(1)); > > > > > > > >>>>>>> ``` > > > > > > > >>>>>>> > > > > > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't > > work: > > > > > > > >>>>>>> ```java > > > > > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > > > trip.userId); > > > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > > > > > >>>>>>> userTrips.map(trip -> > > > > > > trip).assignTimestampsAndWatermarks(new > > > > > > > >>>>>>> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > >>>>>>> @Override > > > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > > > >>>>>>> } > > > > > > > >>>>>>> }); > > > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > >>>>>>> Time.days(1)); > > > > > > > >>>>>>> ``` > > > > > > > >>>>>>> > > > > > > > >>>>>>> It feels a bug to me, but I want to confirm it before I > > file > > > > the > > > > > > bug > > > > > > > >>>>>>> report. > > > > > > > >>>>>>> > > > > > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[hidden email]> > > > > wrote: > > > > > > > >>>>>>>> Hi, > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Could you check the watermark of the window operator? > > One > > > > > > possible > > > > > > > >>>>>>> situation would be some of the keys are not getting > > enough > > > > > > inputs, so > > > > > > > >>>>> their > > > > > > > >>>>>>> watermarks remain below the window end time and hold the > > > > window > > > > > > > >>>>> operator > > > > > > > >>>>>>> watermark back. IMO, it’s a good practice to assign > > watermark > > > > > > earlier > > > > > > > >>>>> in > > > > > > > >>>>>>> the data pipeline. > > > > > > > >>>>>>>> Best, > > > > > > > >>>>>>>> Paul Lam > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > > > > > > > >>>>>>>>> ```java > > > > > > > >>>>>>>>> DataStream<Trip> trips = > > > > > > > >>>>>>>>> > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > > > >>>>>>> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > >>>>>>>>> @Override > > > > > > > >>>>>>>>> public long extractTimestamp(Trip trip) { > > > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > > > >>>>>>>>> } > > > > > > > >>>>>>>>> }); > > > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > > > > >>>>> trip.userId); > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > > > >>>>> userTrips.process(new > > > > > > > >>>>>>> Featurization()); > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > > > windowedUserTrips = > > > > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > >>>>>>>>> Time.days(1)); > > > > > > > >>>>>>>>> ``` > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> But not after `keyBy` and `process`: > > > > > > > >>>>>>>>> ```java > > > > > > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > > > > >>>>> trip.userId); > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > > > >>>>>>>>> userTrips.process(new > > > > > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > > > > > > > >>>>>>> > > > > > > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > > > > >>>>>>>>> @Override > > > > > > > >>>>>>>>> public long extractTimestamp(FeaturizedTrip > > > > trip) { > > > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > > > >>>>>>>>> } > > > > > > > >>>>>>>>> }); > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > > > windowedUserTrips = > > > > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > >>>>>>>>> Time.days(1)); > > > > > > > >>>>>>>>> ``` > > > > > > > >>>>>>>>> Windows are never triggered. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Is it a bug or expected behavior? If the latter, where > > is > > > > it > > > > > > > >>>>>>> documented? > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi, Again answers below ;-) Am Do., 9. Mai 2019 um 17:12 Uhr schrieb an0 <[hidden email]>: You are right, thanks. But something is still not totally clear to me. I'll reuse your diagram with a little modification: By my understanding, the answers are "no" and "no". Correct? Q1: no. Watermarks are propagated even in case of skewed key distribution. C.2 will also advance it's event-time clock (based on the WMs) and forward new WMs. Q2: after a, map(A), and map(B) would work fine. Assign watermarks immediatedly after a keyBy() is not a good idea, because 1) the records are shuffled and it's hard to reasoning about ordering, and 2) you lose the KeyedStream property and would have to keyBy() again (unless you use interpreteAsKeyedStream). Now comes the *silly* question: does C.2 exist at all? Since there is only one key value, only one C instance is needed. I could see that C.2 as a physical instance may exist, but as a logical instance it shouldn't exist in the diagram because it is unused. I feel the answer to this silly question may be the most important in understanding my and(perhaps many others') misunderstanding of situations like this. C.2 exists. Flink does not create a flow topology based on data values. As soon as there is a record with a key that would need to go to C.2, we would need it. Use a concrete example: Although C.2 does not receive data, it receives watermarks because WMs are broadcasted. They flow to any task that is reachable by any event. The case that all keys fall into C.1 is not important because a record for C.2 might arrive at any point in time. Also Flink does does not give any guarantees about how keys (or rather key groups) are assigned to tasks. If you rescale the application to a parallelism of 3, the active key group might be scheduled to C.2 or C.3. Long story short, D makes progress in event time because watermarks are broadcasted.
|
> Q2: after a, map(A), and map(B) would work fine. Assign watermarks
> immediatedly after a keyBy() is not a good idea, because 1) the records are > shuffled and it's hard to reasoning about ordering, and 2) you lose the > KeyedStream property and would have to keyBy() again (unless you use > interpreteAsKeyedStream). I know it is better to assignTimestampsAndWatermarks as early as possible. I intentionally put it after keyBy to check my understanding of this specific situation because I may have to use assignTimestampsAndWatermarks after keyBy in my application. I didn't make my question's intention clear though. I'm glad to learn another trick from you: reinterpretAsKeyedStream :). Let's assume it is keyBy. assignTimestampsAndWatermarks.reinterpretAsKeyedStream.timeWindow(C). I wanted to ask: Because it is using keyed windows, every key's window is fired independently, even if I assignTimestampsAndWatermarks after keyBy and C.2 doesn't have any data so generates no watermarks, it won't affect the firing of C.1's windows. Is this understand correct? > Although C.2 does not receive data, it receives watermarks because WMs are > broadcasted. They flow to any task that is reachable by any event. The case > that all keys fall into C.1 is not important because a record for C.2 might > arrive at any point in time. Also Flink does does not give any guarantees > about how keys (or rather key groups) are assigned to tasks. If you rescale > the application to a parallelism of 3, the active key group might be > scheduled to C.2 or C.3. > > Long story short, D makes progress in event time because watermarks are > broadcasted. I know watermarks are broadcasted. But I'm using assignTimestampsAndWatermarks after keyBy, so C.2 doesn't receive watermarks, it creates watermarks from it's received data. Since it doesn't receive any data, it doesn't create any watermarks. D couldn't make progress because one of its inputs, C2, doesn't make progress. Is this understand correct? On 2019/05/10 10:38:35, Fabian Hueske <[hidden email]> wrote: > Hi, > > Again answers below ;-) > > Am Do., 9. Mai 2019 um 17:12 Uhr schrieb an0 <[hidden email]>: > > > You are right, thanks. But something is still not totally clear to me. > > I'll reuse your diagram with a little modification: > > > > DataStream<X> a = ... > > a.map(A).map(B).keyBy(....).timeWindow(C) > > > > and execute this with parallelism 2. However, keyBy only generates one > > single key value, and assume they all go to C.1. Does the data flow look > > like this? > > > > A.1 -- B.1 -----/-- C.1 > > / > > A.2 -- B.2 --/ C.2 > > > > Will the lack of data into C.2 prevent C.1's windows from firing? Will the > > location of assignTimestampsAndWatermarks(after a, after map(A), after > > map(B), after keyBy) matter for the firing of C.1's windows > > By my understanding, the answers are "no" and "no". Correct? > > > > Q1: no. Watermarks are propagated even in case of skewed key distribution. > C.2 will also advance it's event-time clock (based on the WMs) and forward > new WMs. > Q2: after a, map(A), and map(B) would work fine. Assign watermarks > immediatedly after a keyBy() is not a good idea, because 1) the records are > shuffled and it's hard to reasoning about ordering, and 2) you lose the > KeyedStream property and would have to keyBy() again (unless you use > interpreteAsKeyedStream). > > > > Now comes the *silly* question: does C.2 exist at all? Since there is only > > one key value, only one C instance is needed. I could see that C.2 as a > > physical instance may exist, but as a logical instance it shouldn't exist > > in the diagram because it is unused. I feel the answer to this silly > > question may be the most important in understanding my and(perhaps many > > others') misunderstanding of situations like this. > > > > If C.2 exists just because parallelism is set to 2, even though it is not > > logically needed, and it also serves as an input to the next operator if > > there is one, then the mystery is completely solved for me. > > > > C.2 exists. Flink does not create a flow topology based on data values. As > soon as there is a record with a key that would need to go to C.2, we would > need it. > > > > Use a concrete example: > > > > DataStream<X> a = ... > > > > a.map(A).map(B).keyBy(....).assignTimestampsAndWatermarks(C).timeWindowAll(D) > > > > A.1 -- B.1 -----/-- C.1 --\ > > / D > > A.2 -- B.2 --/ C.2 --/ > > > > D's watermark can not progress because C.2's watermark can not progress, > > because C.2 doesn't have any input data, even though C.2 is not logically > > needed but it does exist and it ruins everything :p. Is this understanding > > correct? > > > > Although C.2 does not receive data, it receives watermarks because WMs are > broadcasted. They flow to any task that is reachable by any event. The case > that all keys fall into C.1 is not important because a record for C.2 might > arrive at any point in time. Also Flink does does not give any guarantees > about how keys (or rather key groups) are assigned to tasks. If you rescale > the application to a parallelism of 3, the active key group might be > scheduled to C.2 or C.3. > > Long story short, D makes progress in event time because watermarks are > broadcasted. > > > > > > On 2019/05/09 10:01:44, Fabian Hueske <[hidden email]> wrote: > > > Hi, > > > > > > Please find my response below. > > > > > > Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 <[hidden email]>: > > > > > > > Thanks, but it does't seem covering this rule: > > > > --- Quote > > > > Watermarks are generated at, or directly after, source functions. Each > > > > parallel subtask of a source function usually generates its watermarks > > > > independently. These watermarks define the event time at that > > particular > > > > parallel source. > > > > > > > > As the watermarks flow through the streaming program, they advance the > > > > event time at the operators where they arrive. Whenever an operator > > > > advances its event time, it generates a new watermark downstream for > > its > > > > successor operators. > > > > > > > > Some operators consume multiple input streams; a union, for example, or > > > > operators following a keyBy(…) or partition(…) function. Such an > > operator’s > > > > current event time is the minimum of its input streams’ event times. > > As its > > > > input streams update their event times, so does the operator. > > > > --- End Quote > > > > > > > > The most relevant part, I believe, is this: > > > > "Some operators consume multiple input streams…operators following a > > > > keyBy(…) function. Such an operator’s current event time is the > > minimum of > > > > its input streams’ event times." > > > > > > > > But the wording of "current event time is the minimum of its input > > > > streams’ event times" actually implies that the input streams(produced > > by > > > > keyBy) have different watermarks, the exactly opposite of what you just > > > > explained. > > > > > > > > > > > IMO, the description in the documentation is correct, but looks at the > > > issue from a different angle. > > > An operator task typically has many input from which it receives records. > > > Depending on the number of input operators (one ore more) and the > > > connection between the operator and its input operators (forward, > > > partition, broadcast), an operator task might have a connection to one, > > > some, or all tasks of its input operators. Each input task can send a > > > different watermark, but each task will also send the same watermark to > > all > > > its output tasks. > > > > > > So, this is a matter of distinguishing receiving (different) watermarks > > and > > > emitting (the same) watermarks. > > > > > > Best, Fabian > > > > > > > > > > On 2019/05/03 07:32:07, Fabian Hueske <[hidden email]> wrote: > > > > > Hi, > > > > > > > > > > this should be covered here: > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams > > > > > > > > > > Best, Fabian > > > > > > > > > > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <[hidden email]>: > > > > > > > > > > > This explanation is exactly what I'm looking for, thanks! Is such > > an > > > > > > important rule documented anywhere in the official document? > > > > > > > > > > > > On 2019/04/30 08:47:29, Fabian Hueske <[hidden email]> wrote: > > > > > > > An operator task broadcasts its current watermark to all > > downstream > > > > tasks > > > > > > > that might receive its records. > > > > > > > If you have an the following code: > > > > > > > > > > > > > > DataStream<X> a = ... > > > > > > > a.map(A).map(B).keyBy(....).window(C) > > > > > > > > > > > > > > and execute this with parallelism 2, your plan looks like this > > > > > > > > > > > > > > A.1 -- B.1 --\--/-- C.1 > > > > > > > X > > > > > > > A.2 -- B.2 --/--\-- C.2 > > > > > > > > > > > > > > A.1 will propagate its watermarks to B.1 because only B.1 will > > > > receive > > > > > > its > > > > > > > output events. > > > > > > > However, B.1 will propagate its watermarks to C.1 and C.2 > > because the > > > > > > > output of B.1 is partitioned and all C tasks might receive output > > > > events > > > > > > > from B.1. > > > > > > > > > > > > > > Best, Fabian > > > > > > > > > > > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <[hidden email] > > >: > > > > > > > > > > > > > > > Thanks very much. It definitely explains the problem I'm > > seeing. > > > > > > However, > > > > > > > > something I need to confirm: > > > > > > > > You say "Watermarks are broadcasted/forwarded anyway." Do you > > > > mean, in > > > > > > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter > > what > > > > data > > > > > > > > flows through a specific key's stream, all key streams have the > > > > same > > > > > > > > watermarks? So time-wise, `window` behaves as if `keyBy` is not > > > > there > > > > > > at > > > > > > > > all? > > > > > > > > > > > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz < > > [hidden email]> > > > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > Watermarks are meta events that travel independently of data > > > > events. > > > > > > > > > > > > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all > > > > parallel > > > > > > > > > instances of trips have some data(this is my assumption) so > > > > > > Watermarks > > > > > > > > > can be generated. Afterwards even if some of the keyed > > partitions > > > > > > have > > > > > > > > > no data, Watermarks are broadcasted/forwarded anyway. In > > other > > > > words > > > > > > if > > > > > > > > > at some point Watermarks were generated for all partitions > > of a > > > > > > single > > > > > > > > > stage, they will be forwarded beyond this point. > > > > > > > > > > > > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try > > to > > > > > > assign > > > > > > > > > watermarks for an empty partition which produces no > > Watermarks > > > > at all > > > > > > > > > for this partition, therefore there is no progress beyond > > this > > > > point. > > > > > > > > > > > > > > > > > > I hope this clarifies it a bit. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > > > > > > > If my understanding is correct, then why > > > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works? The > > > > > > `timeWindowAll` > > > > > > > > stream's input streams are task 1 and task 2, with task 2 > > idling, > > > > no > > > > > > matter > > > > > > > > whether `assignTimestampsAndWatermarks` is before or after > > `keyBy`, > > > > > > because > > > > > > > > whether task 2 receives elements only depends on the key > > > > distribution, > > > > > > has > > > > > > > > nothing to do with timestamp assignment, right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > /key 1 trips\ > > > > > > > > > > > > > > > > > > / \ > > > > > > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > > > > > > > timeWindowAll > > > > > > > > > > > > > > > > > > \ idle / > > > > > > > > > > > > > > > > > > \key 2 trips/ > > > > > > > > > > > > > > > > > > > > /key 1 trips--> > > > > > > > > assignTimestampsAndWatermarks\ > > > > > > > > > > / > > > > > > > > \ > > > > > > > > > > (B) trips-->keyBy > > > > > > > > timeWindowAll > > > > > > > > > > \ idle > > > > > > > > / > > > > > > > > > > \key 2 trips--> > > > > > > > > assignTimestampsAndWatermarks/ > > > > > > > > > > > > > > > > > > > > How things are different between A and B from > > `timeWindowAll`'s > > > > > > > > perspective? > > > > > > > > > > > > > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > > > > > > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz < > > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > >> Hi, > > > > > > > > > >> > > > > > > > > > >> Yes I think your explanation is correct. I can also > > recommend > > > > > > Seth's > > > > > > > > > >> webinar where he talks about debugging Watermarks[1] > > > > > > > > > >> > > > > > > > > > >> Best, > > > > > > > > > >> > > > > > > > > > >> Dawid > > > > > > > > > >> > > > > > > > > > >> [1] > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > > > > > > > > >> > > > > > > > > > >> On 22/04/2019 22:55, an0 wrote: > > > > > > > > > >>> Thanks, I feel I'm getting closer to the truth. > > > > > > > > > >>> > > > > > > > > > >>> So parallelism is the cause? Say my parallelism is 2. > > Does > > > > that > > > > > > mean > > > > > > > > I get 2 tasks running after `keyBy` if even all elements have > > the > > > > same > > > > > > key > > > > > > > > so go to 1 down stream(say task 1)? And it is the other > > task(task > > > > 2) > > > > > > with > > > > > > > > no incoming data that caused the `timeWindowAll` stream unable > > to > > > > > > progress? > > > > > > > > Because both task 1 and task 2 are its input streams and one is > > > > idling > > > > > > so > > > > > > > > its event time cannot make progress? > > > > > > > > > >>> > > > > > > > > > >>> On 2019/04/22 01:57:39, Guowei Ma <[hidden email]> > > > > wrote: > > > > > > > > > >>>> HI, > > > > > > > > > >>>> > > > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM > > at > > > > least > > > > > > > > after it > > > > > > > > > >>>> receives an element. > > > > > > > > > >>>> > > > > > > > > > >>>> For after Keyby: > > > > > > > > > >>>> Flink uses the HashCode of key and the parallelism of > > down > > > > > > stream > > > > > > > > to decide > > > > > > > > > >>>> which subtask would receive the element. This means if > > your > > > > key > > > > > > is > > > > > > > > always > > > > > > > > > >>>> same, all the sources will only send the elements to the > > > > same > > > > > > down > > > > > > > > stream > > > > > > > > > >>>> task, for example only no. 3 > > > > > > > > BoundedOutOfOrdernessTimestampExtractor. > > > > > > > > > >>>> > > > > > > > > > >>>> For before Keyby: > > > > > > > > > >>>> In your case, the Source and > > > > > > > > BoundedOutOfOrdernessTimestampExtractors would > > > > > > > > > >>>> be chained together, which means every > > > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive > > > > elements. > > > > > > > > > >>>> > > > > > > > > > >>>> Best, > > > > > > > > > >>>> Guowei > > > > > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > > >>>> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > > > > > > > > > >>>> > > > > > > > > > >>>>> Hi, > > > > > > > > > >>>>> > > > > > > > > > >>>>> First of all, thank you for the `shuffle()` tip. It > > works. > > > > > > > > However, I > > > > > > > > > >>>>> still don't understand why it doesn't work without > > calling > > > > > > > > `shuffle()`. > > > > > > > > > >>>>> > > > > > > > > > >>>>> Why would not all > > BoundedOutOfOrdernessTimestampExtractors > > > > > > receive > > > > > > > > trips? > > > > > > > > > >>>>> All the trips has keys and timestamps. As I said in my > > > > reply to > > > > > > > > Paul, I see > > > > > > > > > >>>>> the same watermarks being extracted. > > > > > > > > > >>>>> > > > > > > > > > >>>>> How could calling `assignTimestampsAndWatermarks` > > before VS > > > > > > after > > > > > > > > `keyBy` > > > > > > > > > >>>>> matter? My understanding is any specific window for a > > > > specific > > > > > > key > > > > > > > > always > > > > > > > > > >>>>> receives the exactly same data, and the calling order > > of > > > > > > > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't > > > > affect > > > > > > that. > > > > > > > > > >>>>> > > > > > > > > > >>>>> To make `keyBy` as irrelevant as possible, I tried > > letting > > > > it > > > > > > > > always > > > > > > > > > >>>>> return the same key so that there is only 1 keyed > > stream > > > > and > > > > > > it is > > > > > > > > exactly > > > > > > > > > >>>>> the same as the original unkeyed stream. It still > > doesn't > > > > > > trigger > > > > > > > > windows: > > > > > > > > > >>>>> ```java > > > > > > > > > >>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > > > > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > 0L); > > > > > > > > > >>>>> DataStream<Trip> featurizedUserTrips = > > > > > > > > > >>>>> userTrips.map(trip -> > > > > > > > > trip).assignTimestampsAndWatermarks(new > > > > > > > > > >>>>> > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > > > >>>>> @Override > > > > > > > > > >>>>> public long extractTimestamp(Trip trip) { > > > > > > > > > >>>>> return trip.endTime.getTime(); > > > > > > > > > >>>>> } > > > > > > > > > >>>>> }); > > > > > > > > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > >>>>> Time.days(1)); > > > > > > > > > >>>>> ``` > > > > > > > > > >>>>> > > > > > > > > > >>>>> It makes no sense to me. Please help me understand why > > it > > > > > > doesn't > > > > > > > > work. > > > > > > > > > >>>>> Thanks! > > > > > > > > > >>>>> > > > > > > > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma < > > [hidden email]> > > > > > > wrote: > > > > > > > > > >>>>>> Hi, > > > > > > > > > >>>>>> After keyby maybe only some of > > > > > > > > BoundedOutOfOrdernessTimestampExtractors > > > > > > > > > >>>>>> could receive the elements(trip). If that is the case > > > > > > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does > > not > > > > > > receive > > > > > > > > element > > > > > > > > > >>>>>> would not send the WM. Since that the timeWindowAll > > > > operator > > > > > > > > could not be > > > > > > > > > >>>>>> triggered. > > > > > > > > > >>>>>> You could add a shuffle() before the > > > > > > > > assignTimestampsAndWatermarks in > > > > > > > > > >>>>> your > > > > > > > > > >>>>>> second case and check if the window is triggered. If > > it > > > > > > could be > > > > > > > > > >>>>> triggered > > > > > > > > > >>>>>> you could check the distribution of elements > > generated by > > > > the > > > > > > > > source. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Best, > > > > > > > > > >>>>>> Guowei > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> [hidden email] <[hidden email]> 于2019年4月19日周五 > > > > 上午4:10写道: > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> I don't think it is the watermark. I see the same > > > > watermarks > > > > > > > > from the > > > > > > > > > >>>>> two > > > > > > > > > >>>>>>> versions of code. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> The processing on the keyed stream doesn't change > > event > > > > time > > > > > > at > > > > > > > > all. I > > > > > > > > > >>>>> can > > > > > > > > > >>>>>>> simply change my code to use `map` on the keyed > > stream to > > > > > > return > > > > > > > > back > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>> input data, so that the window operator receives the > > > > exactly > > > > > > same > > > > > > > > > >>>>> data. The > > > > > > > > > >>>>>>> only difference is when I do > > > > > > `assignTimestampsAndWatermarks`. The > > > > > > > > > >>>>> result is > > > > > > > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before > > `keyBy` > > > > > > works: > > > > > > > > > >>>>>>> ```java > > > > > > > > > >>>>>>> DataStream<Trip> trips = > > > > > > > > > >>>>>>> > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > > > > > >>>>>>> > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > > > >>>>>>> @Override > > > > > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > > > > > >>>>>>> } > > > > > > > > > >>>>>>> }); > > > > > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip > > -> > > > > > > > > trip.userId); > > > > > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > > userTrips.map(trip -> > > > > > > > > trip); > > > > > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> > > windowedUserTrips = > > > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > >>>>>>> Time.days(1)); > > > > > > > > > >>>>>>> ``` > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't > > > > work: > > > > > > > > > >>>>>>> ```java > > > > > > > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip > > -> > > > > > > > > trip.userId); > > > > > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > > > > > > > >>>>>>> userTrips.map(trip -> > > > > > > > > trip).assignTimestampsAndWatermarks(new > > > > > > > > > >>>>>>> > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > > > >>>>>>> @Override > > > > > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > > > > > >>>>>>> } > > > > > > > > > >>>>>>> }); > > > > > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> > > windowedUserTrips = > > > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > >>>>>>> Time.days(1)); > > > > > > > > > >>>>>>> ``` > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> It feels a bug to me, but I want to confirm it > > before I > > > > file > > > > > > the > > > > > > > > bug > > > > > > > > > >>>>>>> report. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam < > > [hidden email]> > > > > > > wrote: > > > > > > > > > >>>>>>>> Hi, > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Could you check the watermark of the window > > operator? > > > > One > > > > > > > > possible > > > > > > > > > >>>>>>> situation would be some of the keys are not getting > > > > enough > > > > > > > > inputs, so > > > > > > > > > >>>>> their > > > > > > > > > >>>>>>> watermarks remain below the window end time and hold > > the > > > > > > window > > > > > > > > > >>>>> operator > > > > > > > > > >>>>>>> watermark back. IMO, it’s a good practice to assign > > > > watermark > > > > > > > > earlier > > > > > > > > > >>>>> in > > > > > > > > > >>>>>>> the data pipeline. > > > > > > > > > >>>>>>>> Best, > > > > > > > > > >>>>>>>> Paul Lam > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` > > works: > > > > > > > > > >>>>>>>>> ```java > > > > > > > > > >>>>>>>>> DataStream<Trip> trips = > > > > > > > > > >>>>>>>>> > > > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > > > > > >>>>>>> > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > > > >>>>>>>>> @Override > > > > > > > > > >>>>>>>>> public long extractTimestamp(Trip trip) > > { > > > > > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > > > > > >>>>>>>>> } > > > > > > > > > >>>>>>>>> }); > > > > > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = > > trips.keyBy(trip -> > > > > > > > > > >>>>> trip.userId); > > > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > > > > > >>>>> userTrips.process(new > > > > > > > > > >>>>>>> Featurization()); > > > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > > > > > windowedUserTrips = > > > > > > > > > >>>>>>>>> > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > >>>>>>>>> Time.days(1)); > > > > > > > > > >>>>>>>>> ``` > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> But not after `keyBy` and `process`: > > > > > > > > > >>>>>>>>> ```java > > > > > > > > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = > > trips.keyBy(trip -> > > > > > > > > > >>>>> trip.userId); > > > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > > > > > >>>>>>>>> userTrips.process(new > > > > > > > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > > > > > > > > > >>>>>>> > > > > > > > > > > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > > > > > > >>>>>>>>> @Override > > > > > > > > > >>>>>>>>> public long > > extractTimestamp(FeaturizedTrip > > > > > > trip) { > > > > > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > > > > > >>>>>>>>> } > > > > > > > > > >>>>>>>>> }); > > > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > > > > > windowedUserTrips = > > > > > > > > > >>>>>>>>> > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > >>>>>>>>> Time.days(1)); > > > > > > > > > >>>>>>>>> ``` > > > > > > > > > >>>>>>>>> Windows are never triggered. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> Is it a bug or expected behavior? If the latter, > > where > > > > is > > > > > > it > > > > > > > > > >>>>>>> documented? > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |