Hi, Am Fr., 10. Mai 2019 um 16:55 Uhr schrieb an0 <[hidden email]>: > Q2: after a, map(A), and map(B) would work fine. Assign watermarks Yes, that is correct. A window operator task evaluates a window when its event-time (advanced by watermarks) triggers the computation. Window operator tasks do not synchronize their compuation. However, they are usually synchronized due to the broadcasted watermarks. > Although C.2 does not receive data, it receives watermarks because WMs are Yes, I think that's correct. A timestamp assigner / watermark generator swallows all WMs it receives and generates new ones. However, if the assigner does not see any records, the WM will not move forward. On 2019/05/10 10:38:35, Fabian Hueske <[hidden email]> wrote: |
Thanks everyone, I learned a lot through this single thread!
On 2019/05/13 07:19:30, Fabian Hueske <[hidden email]> wrote: > Hi, > > Am Fr., 10. Mai 2019 um 16:55 Uhr schrieb an0 <[hidden email]>: > > > > Q2: after a, map(A), and map(B) would work fine. Assign watermarks > > > immediatedly after a keyBy() is not a good idea, because 1) the records > > are > > > shuffled and it's hard to reasoning about ordering, and 2) you lose the > > > KeyedStream property and would have to keyBy() again (unless you use > > > interpreteAsKeyedStream). > > > > I know it is better to assignTimestampsAndWatermarks as early as possible. > > I intentionally put it after keyBy to check my understanding of this > > specific situation because I may have to use assignTimestampsAndWatermarks > > after keyBy in my application. I didn't make my question's intention clear > > though. > > > > I'm glad to learn another trick from you: reinterpretAsKeyedStream :). > > Let's assume it is keyBy. > > assignTimestampsAndWatermarks.reinterpretAsKeyedStream.timeWindow(C). > > > > I wanted to ask: > > Because it is using keyed windows, every key's window is fired > > independently, even if I assignTimestampsAndWatermarks after keyBy and C.2 > > doesn't have any data so generates no watermarks, it won't affect the > > firing of C.1's windows. Is this understand correct? > > > > Yes, that is correct. A window operator task evaluates a window when its > event-time (advanced by watermarks) triggers the computation. Window > operator tasks do not synchronize their compuation. However, they are > usually synchronized due to the broadcasted watermarks. > > > > > Although C.2 does not receive data, it receives watermarks because WMs > > are > > > broadcasted. They flow to any task that is reachable by any event. The > > case > > > that all keys fall into C.1 is not important because a record for C.2 > > might > > > arrive at any point in time. Also Flink does does not give any guarantees > > > about how keys (or rather key groups) are assigned to tasks. If you > > rescale > > > the application to a parallelism of 3, the active key group might be > > > scheduled to C.2 or C.3. > > > > > > Long story short, D makes progress in event time because watermarks are > > > broadcasted. > > > > I know watermarks are broadcasted. But I'm using > > assignTimestampsAndWatermarks after keyBy, so C.2 doesn't receive > > watermarks, it creates watermarks from it's received data. Since it doesn't > > receive any data, it doesn't create any watermarks. D couldn't make > > progress because one of its inputs, C2, doesn't make progress. Is this > > understand correct? > > > > Yes, I think that's correct. A timestamp assigner / watermark generator > swallows all WMs it receives and generates new ones. However, if the > assigner does not see any records, the WM will not move forward. > > > > On 2019/05/10 10:38:35, Fabian Hueske <[hidden email]> wrote: > > > Hi, > > > > > > Again answers below ;-) > > > > > > Am Do., 9. Mai 2019 um 17:12 Uhr schrieb an0 <[hidden email]>: > > > > > > > You are right, thanks. But something is still not totally clear to me. > > > > I'll reuse your diagram with a little modification: > > > > > > > > DataStream<X> a = ... > > > > a.map(A).map(B).keyBy(....).timeWindow(C) > > > > > > > > and execute this with parallelism 2. However, keyBy only generates one > > > > single key value, and assume they all go to C.1. Does the data flow > > look > > > > like this? > > > > > > > > A.1 -- B.1 -----/-- C.1 > > > > / > > > > A.2 -- B.2 --/ C.2 > > > > > > > > Will the lack of data into C.2 prevent C.1's windows from firing? Will > > the > > > > location of assignTimestampsAndWatermarks(after a, after map(A), after > > > > map(B), after keyBy) matter for the firing of C.1's windows > > > > > > By my understanding, the answers are "no" and "no". Correct? > > > > > > > > Q1: no. Watermarks are propagated even in case of skewed key > > distribution. > > > C.2 will also advance it's event-time clock (based on the WMs) and > > forward > > > new WMs. > > > Q2: after a, map(A), and map(B) would work fine. Assign watermarks > > > immediatedly after a keyBy() is not a good idea, because 1) the records > > are > > > shuffled and it's hard to reasoning about ordering, and 2) you lose the > > > KeyedStream property and would have to keyBy() again (unless you use > > > interpreteAsKeyedStream). > > > > > > > > > > Now comes the *silly* question: does C.2 exist at all? Since there is > > only > > > > one key value, only one C instance is needed. I could see that C.2 as a > > > > physical instance may exist, but as a logical instance it shouldn't > > exist > > > > in the diagram because it is unused. I feel the answer to this silly > > > > question may be the most important in understanding my and(perhaps many > > > > others') misunderstanding of situations like this. > > > > > > > > If C.2 exists just because parallelism is set to 2, even though it is > > not > > > > logically needed, and it also serves as an input to the next operator > > if > > > > there is one, then the mystery is completely solved for me. > > > > > > > > C.2 exists. Flink does not create a flow topology based on data > > values. As > > > soon as there is a record with a key that would need to go to C.2, we > > would > > > need it. > > > > > > > > > > Use a concrete example: > > > > > > > > DataStream<X> a = ... > > > > > > > > > > a.map(A).map(B).keyBy(....).assignTimestampsAndWatermarks(C).timeWindowAll(D) > > > > > > > > A.1 -- B.1 -----/-- C.1 --\ > > > > / D > > > > A.2 -- B.2 --/ C.2 --/ > > > > > > > > D's watermark can not progress because C.2's watermark can not > > progress, > > > > because C.2 doesn't have any input data, even though C.2 is not > > logically > > > > needed but it does exist and it ruins everything :p. Is this > > understanding > > > > correct? > > > > > > > > > > Although C.2 does not receive data, it receives watermarks because WMs > > are > > > broadcasted. They flow to any task that is reachable by any event. The > > case > > > that all keys fall into C.1 is not important because a record for C.2 > > might > > > arrive at any point in time. Also Flink does does not give any guarantees > > > about how keys (or rather key groups) are assigned to tasks. If you > > rescale > > > the application to a parallelism of 3, the active key group might be > > > scheduled to C.2 or C.3. > > > > > > Long story short, D makes progress in event time because watermarks are > > > broadcasted. > > > > > > > > > > > > > > On 2019/05/09 10:01:44, Fabian Hueske <[hidden email]> wrote: > > > > > Hi, > > > > > > > > > > Please find my response below. > > > > > > > > > > Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 <[hidden email]>: > > > > > > > > > > > Thanks, but it does't seem covering this rule: > > > > > > --- Quote > > > > > > Watermarks are generated at, or directly after, source functions. > > Each > > > > > > parallel subtask of a source function usually generates its > > watermarks > > > > > > independently. These watermarks define the event time at that > > > > particular > > > > > > parallel source. > > > > > > > > > > > > As the watermarks flow through the streaming program, they advance > > the > > > > > > event time at the operators where they arrive. Whenever an operator > > > > > > advances its event time, it generates a new watermark downstream > > for > > > > its > > > > > > successor operators. > > > > > > > > > > > > Some operators consume multiple input streams; a union, for > > example, or > > > > > > operators following a keyBy(…) or partition(…) function. Such an > > > > operator’s > > > > > > current event time is the minimum of its input streams’ event > > times. > > > > As its > > > > > > input streams update their event times, so does the operator. > > > > > > --- End Quote > > > > > > > > > > > > The most relevant part, I believe, is this: > > > > > > "Some operators consume multiple input streams…operators following > > a > > > > > > keyBy(…) function. Such an operator’s current event time is the > > > > minimum of > > > > > > its input streams’ event times." > > > > > > > > > > > > But the wording of "current event time is the minimum of its input > > > > > > streams’ event times" actually implies that the input > > streams(produced > > > > by > > > > > > keyBy) have different watermarks, the exactly opposite of what you > > just > > > > > > explained. > > > > > > > > > > > > > > > > > IMO, the description in the documentation is correct, but looks at > > the > > > > > issue from a different angle. > > > > > An operator task typically has many input from which it receives > > records. > > > > > Depending on the number of input operators (one ore more) and the > > > > > connection between the operator and its input operators (forward, > > > > > partition, broadcast), an operator task might have a connection to > > one, > > > > > some, or all tasks of its input operators. Each input task can send a > > > > > different watermark, but each task will also send the same watermark > > to > > > > all > > > > > its output tasks. > > > > > > > > > > So, this is a matter of distinguishing receiving (different) > > watermarks > > > > and > > > > > emitting (the same) watermarks. > > > > > > > > > > Best, Fabian > > > > > > > > > > > > > > > > On 2019/05/03 07:32:07, Fabian Hueske <[hidden email]> wrote: > > > > > > > Hi, > > > > > > > > > > > > > > this should be covered here: > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams > > > > > > > > > > > > > > Best, Fabian > > > > > > > > > > > > > > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <[hidden email]>: > > > > > > > > > > > > > > > This explanation is exactly what I'm looking for, thanks! Is > > such > > > > an > > > > > > > > important rule documented anywhere in the official document? > > > > > > > > > > > > > > > > On 2019/04/30 08:47:29, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > > > An operator task broadcasts its current watermark to all > > > > downstream > > > > > > tasks > > > > > > > > > that might receive its records. > > > > > > > > > If you have an the following code: > > > > > > > > > > > > > > > > > > DataStream<X> a = ... > > > > > > > > > a.map(A).map(B).keyBy(....).window(C) > > > > > > > > > > > > > > > > > > and execute this with parallelism 2, your plan looks like > > this > > > > > > > > > > > > > > > > > > A.1 -- B.1 --\--/-- C.1 > > > > > > > > > X > > > > > > > > > A.2 -- B.2 --/--\-- C.2 > > > > > > > > > > > > > > > > > > A.1 will propagate its watermarks to B.1 because only B.1 > > will > > > > > > receive > > > > > > > > its > > > > > > > > > output events. > > > > > > > > > However, B.1 will propagate its watermarks to C.1 and C.2 > > > > because the > > > > > > > > > output of B.1 is partitioned and all C tasks might receive > > output > > > > > > events > > > > > > > > > from B.1. > > > > > > > > > > > > > > > > > > Best, Fabian > > > > > > > > > > > > > > > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 < > > [hidden email] > > > > >: > > > > > > > > > > > > > > > > > > > Thanks very much. It definitely explains the problem I'm > > > > seeing. > > > > > > > > However, > > > > > > > > > > something I need to confirm: > > > > > > > > > > You say "Watermarks are broadcasted/forwarded anyway." Do > > you > > > > > > mean, in > > > > > > > > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't > > matter > > > > what > > > > > > data > > > > > > > > > > flows through a specific key's stream, all key streams > > have the > > > > > > same > > > > > > > > > > watermarks? So time-wise, `window` behaves as if `keyBy` > > is not > > > > > > there > > > > > > > > at > > > > > > > > > > all? > > > > > > > > > > > > > > > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz < > > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > Watermarks are meta events that travel independently of > > data > > > > > > events. > > > > > > > > > > > > > > > > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all > > > > > > parallel > > > > > > > > > > > instances of trips have some data(this is my assumption) > > so > > > > > > > > Watermarks > > > > > > > > > > > can be generated. Afterwards even if some of the keyed > > > > partitions > > > > > > > > have > > > > > > > > > > > no data, Watermarks are broadcasted/forwarded anyway. In > > > > other > > > > > > words > > > > > > > > if > > > > > > > > > > > at some point Watermarks were generated for all > > partitions > > > > of a > > > > > > > > single > > > > > > > > > > > stage, they will be forwarded beyond this point. > > > > > > > > > > > > > > > > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you > > try > > > > to > > > > > > > > assign > > > > > > > > > > > watermarks for an empty partition which produces no > > > > Watermarks > > > > > > at all > > > > > > > > > > > for this partition, therefore there is no progress beyond > > > > this > > > > > > point. > > > > > > > > > > > > > > > > > > > > > > I hope this clarifies it a bit. > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > > > > > > > > > If my understanding is correct, then why > > > > > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works? The > > > > > > > > `timeWindowAll` > > > > > > > > > > stream's input streams are task 1 and task 2, with task 2 > > > > idling, > > > > > > no > > > > > > > > matter > > > > > > > > > > whether `assignTimestampsAndWatermarks` is before or after > > > > `keyBy`, > > > > > > > > because > > > > > > > > > > whether task 2 receives elements only depends on the key > > > > > > distribution, > > > > > > > > has > > > > > > > > > > nothing to do with timestamp assignment, right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > /key 1 trips\ > > > > > > > > > > > > > > > > > > > > > > / \ > > > > > > > > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > > > > > > > > > timeWindowAll > > > > > > > > > > > > > > > > > > > > > > \ idle / > > > > > > > > > > > > > > > > > > > > > > \key 2 trips/ > > > > > > > > > > > > > > > > > > > > > > > > /key 1 trips--> > > > > > > > > > > assignTimestampsAndWatermarks\ > > > > > > > > > > > > / > > > > > > > > > > \ > > > > > > > > > > > > (B) trips-->keyBy > > > > > > > > > > timeWindowAll > > > > > > > > > > > > \ idle > > > > > > > > > > / > > > > > > > > > > > > \key 2 trips--> > > > > > > > > > > assignTimestampsAndWatermarks/ > > > > > > > > > > > > > > > > > > > > > > > > How things are different between A and B from > > > > `timeWindowAll`'s > > > > > > > > > > perspective? > > > > > > > > > > > > > > > > > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > > > > > > > > > > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz < > > > > > > [hidden email]> > > > > > > > > > > wrote: > > > > > > > > > > > >> Hi, > > > > > > > > > > > >> > > > > > > > > > > > >> Yes I think your explanation is correct. I can also > > > > recommend > > > > > > > > Seth's > > > > > > > > > > > >> webinar where he talks about debugging Watermarks[1] > > > > > > > > > > > >> > > > > > > > > > > > >> Best, > > > > > > > > > > > >> > > > > > > > > > > > >> Dawid > > > > > > > > > > > >> > > > > > > > > > > > >> [1] > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > > > > > > > > > > >> > > > > > > > > > > > >> On 22/04/2019 22:55, an0 wrote: > > > > > > > > > > > >>> Thanks, I feel I'm getting closer to the truth. > > > > > > > > > > > >>> > > > > > > > > > > > >>> So parallelism is the cause? Say my parallelism is 2. > > > > Does > > > > > > that > > > > > > > > mean > > > > > > > > > > I get 2 tasks running after `keyBy` if even all elements > > have > > > > the > > > > > > same > > > > > > > > key > > > > > > > > > > so go to 1 down stream(say task 1)? And it is the other > > > > task(task > > > > > > 2) > > > > > > > > with > > > > > > > > > > no incoming data that caused the `timeWindowAll` stream > > unable > > > > to > > > > > > > > progress? > > > > > > > > > > Because both task 1 and task 2 are its input streams and > > one is > > > > > > idling > > > > > > > > so > > > > > > > > > > its event time cannot make progress? > > > > > > > > > > > >>> > > > > > > > > > > > >>> On 2019/04/22 01:57:39, Guowei Ma < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > >>>> HI, > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a > > WM > > > > at > > > > > > least > > > > > > > > > > after it > > > > > > > > > > > >>>> receives an element. > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> For after Keyby: > > > > > > > > > > > >>>> Flink uses the HashCode of key and the parallelism > > of > > > > down > > > > > > > > stream > > > > > > > > > > to decide > > > > > > > > > > > >>>> which subtask would receive the element. This means > > if > > > > your > > > > > > key > > > > > > > > is > > > > > > > > > > always > > > > > > > > > > > >>>> same, all the sources will only send the elements > > to the > > > > > > same > > > > > > > > down > > > > > > > > > > stream > > > > > > > > > > > >>>> task, for example only no. 3 > > > > > > > > > > BoundedOutOfOrdernessTimestampExtractor. > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> For before Keyby: > > > > > > > > > > > >>>> In your case, the Source and > > > > > > > > > > BoundedOutOfOrdernessTimestampExtractors would > > > > > > > > > > > >>>> be chained together, which means every > > > > > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors will > > receive > > > > > > elements. > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> Best, > > > > > > > > > > > >>>> Guowei > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> an0 <[hidden email]> 于2019年4月19日周五 下午10:41写道: > > > > > > > > > > > >>>> > > > > > > > > > > > >>>>> Hi, > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> First of all, thank you for the `shuffle()` tip. It > > > > works. > > > > > > > > > > However, I > > > > > > > > > > > >>>>> still don't understand why it doesn't work without > > > > calling > > > > > > > > > > `shuffle()`. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Why would not all > > > > BoundedOutOfOrdernessTimestampExtractors > > > > > > > > receive > > > > > > > > > > trips? > > > > > > > > > > > >>>>> All the trips has keys and timestamps. As I said > > in my > > > > > > reply to > > > > > > > > > > Paul, I see > > > > > > > > > > > >>>>> the same watermarks being extracted. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> How could calling `assignTimestampsAndWatermarks` > > > > before VS > > > > > > > > after > > > > > > > > > > `keyBy` > > > > > > > > > > > >>>>> matter? My understanding is any specific window > > for a > > > > > > specific > > > > > > > > key > > > > > > > > > > always > > > > > > > > > > > >>>>> receives the exactly same data, and the calling > > order > > > > of > > > > > > > > > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` > > shouldn't > > > > > > affect > > > > > > > > that. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> To make `keyBy` as irrelevant as possible, I tried > > > > letting > > > > > > it > > > > > > > > > > always > > > > > > > > > > > >>>>> return the same key so that there is only 1 keyed > > > > stream > > > > > > and > > > > > > > > it is > > > > > > > > > > exactly > > > > > > > > > > > >>>>> the same as the original unkeyed stream. It still > > > > doesn't > > > > > > > > trigger > > > > > > > > > > windows: > > > > > > > > > > > >>>>> ```java > > > > > > > > > > > >>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > > > > > > > >>>>> KeyedStream<Trip, Long> userTrips = > > trips.keyBy(trip -> > > > > > > 0L); > > > > > > > > > > > >>>>> DataStream<Trip> featurizedUserTrips = > > > > > > > > > > > >>>>> userTrips.map(trip -> > > > > > > > > > > trip).assignTimestampsAndWatermarks(new > > > > > > > > > > > >>>>> > > > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > > > > > >>>>> @Override > > > > > > > > > > > >>>>> public long extractTimestamp(Trip trip) { > > > > > > > > > > > >>>>> return trip.endTime.getTime(); > > > > > > > > > > > >>>>> } > > > > > > > > > > > >>>>> }); > > > > > > > > > > > >>>>> AllWindowedStream<Trip, TimeWindow> > > windowedUserTrips = > > > > > > > > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > > > >>>>> Time.days(1)); > > > > > > > > > > > >>>>> ``` > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> It makes no sense to me. Please help me understand > > why > > > > it > > > > > > > > doesn't > > > > > > > > > > work. > > > > > > > > > > > >>>>> Thanks! > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma < > > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > > > >>>>>> Hi, > > > > > > > > > > > >>>>>> After keyby maybe only some of > > > > > > > > > > BoundedOutOfOrdernessTimestampExtractors > > > > > > > > > > > >>>>>> could receive the elements(trip). If that is the > > case > > > > > > > > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which > > does > > > > not > > > > > > > > receive > > > > > > > > > > element > > > > > > > > > > > >>>>>> would not send the WM. Since that the > > timeWindowAll > > > > > > operator > > > > > > > > > > could not be > > > > > > > > > > > >>>>>> triggered. > > > > > > > > > > > >>>>>> You could add a shuffle() before the > > > > > > > > > > assignTimestampsAndWatermarks in > > > > > > > > > > > >>>>> your > > > > > > > > > > > >>>>>> second case and check if the window is > > triggered. If > > > > it > > > > > > > > could be > > > > > > > > > > > >>>>> triggered > > > > > > > > > > > >>>>>> you could check the distribution of elements > > > > generated by > > > > > > the > > > > > > > > > > source. > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> Best, > > > > > > > > > > > >>>>>> Guowei > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> [hidden email] <[hidden email]> 于2019年4月19日周五 > > > > > > 上午4:10写道: > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>>> I don't think it is the watermark. I see the same > > > > > > watermarks > > > > > > > > > > from the > > > > > > > > > > > >>>>> two > > > > > > > > > > > >>>>>>> versions of code. > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> The processing on the keyed stream doesn't change > > > > event > > > > > > time > > > > > > > > at > > > > > > > > > > all. I > > > > > > > > > > > >>>>> can > > > > > > > > > > > >>>>>>> simply change my code to use `map` on the keyed > > > > stream to > > > > > > > > return > > > > > > > > > > back > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>> input data, so that the window operator receives > > the > > > > > > exactly > > > > > > > > same > > > > > > > > > > > >>>>> data. The > > > > > > > > > > > >>>>>>> only difference is when I do > > > > > > > > `assignTimestampsAndWatermarks`. The > > > > > > > > > > > >>>>> result is > > > > > > > > > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before > > > > `keyBy` > > > > > > > > works: > > > > > > > > > > > >>>>>>> ```java > > > > > > > > > > > >>>>>>> DataStream<Trip> trips = > > > > > > > > > > > >>>>>>> > > > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > > > > > > > >>>>>>> > > > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > > > > > >>>>>>> @Override > > > > > > > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > > > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > > > > > > > >>>>>>> } > > > > > > > > > > > >>>>>>> }); > > > > > > > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = > > trips.keyBy(trip > > > > -> > > > > > > > > > > trip.userId); > > > > > > > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > > > > userTrips.map(trip -> > > > > > > > > > > trip); > > > > > > > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> > > > > windowedUserTrips = > > > > > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > > > >>>>>>> Time.days(1)); > > > > > > > > > > > >>>>>>> ``` > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` > > doesn't > > > > > > work: > > > > > > > > > > > >>>>>>> ```java > > > > > > > > > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > > > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = > > trips.keyBy(trip > > > > -> > > > > > > > > > > trip.userId); > > > > > > > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > > > > > > > > > >>>>>>> userTrips.map(trip -> > > > > > > > > > > trip).assignTimestampsAndWatermarks(new > > > > > > > > > > > >>>>>>> > > > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > > > > > >>>>>>> @Override > > > > > > > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > > > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > > > > > > > >>>>>>> } > > > > > > > > > > > >>>>>>> }); > > > > > > > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> > > > > windowedUserTrips = > > > > > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > > > >>>>>>> Time.days(1)); > > > > > > > > > > > >>>>>>> ``` > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> It feels a bug to me, but I want to confirm it > > > > before I > > > > > > file > > > > > > > > the > > > > > > > > > > bug > > > > > > > > > > > >>>>>>> report. > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam < > > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > > > >>>>>>>> Hi, > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> Could you check the watermark of the window > > > > operator? > > > > > > One > > > > > > > > > > possible > > > > > > > > > > > >>>>>>> situation would be some of the keys are not > > getting > > > > > > enough > > > > > > > > > > inputs, so > > > > > > > > > > > >>>>> their > > > > > > > > > > > >>>>>>> watermarks remain below the window end time and > > hold > > > > the > > > > > > > > window > > > > > > > > > > > >>>>> operator > > > > > > > > > > > >>>>>>> watermark back. IMO, it’s a good practice to > > assign > > > > > > watermark > > > > > > > > > > earlier > > > > > > > > > > > >>>>> in > > > > > > > > > > > >>>>>>> the data pipeline. > > > > > > > > > > > >>>>>>>> Best, > > > > > > > > > > > >>>>>>>> Paul Lam > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>>> 在 2019年4月17日,23:04,[hidden email] 写道: > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` > > > > works: > > > > > > > > > > > >>>>>>>>> ```java > > > > > > > > > > > >>>>>>>>> DataStream<Trip> trips = > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > > > > > > > >>>>>>> > > > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > > > > > > >>>>>>>>> @Override > > > > > > > > > > > >>>>>>>>> public long extractTimestamp(Trip > > trip) > > > > { > > > > > > > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > > > > > > > >>>>>>>>> } > > > > > > > > > > > >>>>>>>>> }); > > > > > > > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = > > > > trips.keyBy(trip -> > > > > > > > > > > > >>>>> trip.userId); > > > > > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips > > = > > > > > > > > > > > >>>>> userTrips.process(new > > > > > > > > > > > >>>>>>> Featurization()); > > > > > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > > > > > > > windowedUserTrips = > > > > > > > > > > > >>>>>>>>> > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > > > >>>>>>>>> Time.days(1)); > > > > > > > > > > > >>>>>>>>> ``` > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> But not after `keyBy` and `process`: > > > > > > > > > > > >>>>>>>>> ```java > > > > > > > > > > > >>>>>>>>> DataStream<Trip> trips = > > env.addSource(consumer); > > > > > > > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = > > > > trips.keyBy(trip -> > > > > > > > > > > > >>>>> trip.userId); > > > > > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips > > = > > > > > > > > > > > >>>>>>>>> userTrips.process(new > > > > > > > > > > > >>>>>>> > > Featurization()).assignTimestampsAndWatermarks(new > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > > > > > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > > > > > > > > >>>>>>>>> @Override > > > > > > > > > > > >>>>>>>>> public long > > > > extractTimestamp(FeaturizedTrip > > > > > > > > trip) { > > > > > > > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > > > > > > > >>>>>>>>> } > > > > > > > > > > > >>>>>>>>> }); > > > > > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > > > > > > > windowedUserTrips = > > > > > > > > > > > >>>>>>>>> > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > > > > > > >>>>>>>>> Time.days(1)); > > > > > > > > > > > >>>>>>>>> ``` > > > > > > > > > > > >>>>>>>>> Windows are never triggered. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Is it a bug or expected behavior? If the > > latter, > > > > where > > > > > > is > > > > > > > > it > > > > > > > > > > > >>>>>>> documented? > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |