Hi ,
I have the following pipeline : 1. single hour window that counts the number of records 2. single day window that accepts the aggregated data from #1 and emits the highest hour count of that day 3. union #1 + #2 4. Logic operator that accepts the data from #3 and keep a listState of #2 and apply some logic on #1 based on that state (e.g comparing a single hour the history of the max hours at the last X days ) and emits the result the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor (event-time) and I allow lateness of 3 hours the problem is that when I try to do unit tests of all the pipeline, the data from #1 rich #4 before the latter accepts the data from #3 hence it doesn't have any state yet (state is always empty when the stream from #1 arrives ). My source in the tests is a collection that represents the records. is there anyway I can solve this ? I appreciate any help you can provide Cheers Avi |
Hi, You can merge the logic of #2 into #4, it will be much simpler. Best, Kurt On Wed, Dec 25, 2019 at 7:36 PM Avi Levi <[hidden email]> wrote:
|
not sure that I can see how it is simpler. #2 is time window per day it emits the highest hour for that day. #4 is not a time window it keeps history of several days . if I want to put the logic of #2 I will need to manage it with timers, correct ? On Thu, Dec 26, 2019 at 6:28 AM Kurt Young <[hidden email]> wrote:
|
Lets say you keep your #1, which does hourly counting, and emit result to the merged new #2. The new #2 would first keep all hourly result in state, and also keep tracking whether it already receive all 24 results belong to same day. Once you received all 24 count belong to the same day, you can start your logic. You could also determine what kind of data you want to keep in state after that. Best, Kurt On Thu, Dec 26, 2019 at 1:14 PM Avi Levi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |