testing - syncing data timeline

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

testing - syncing data timeline

avilevi
 Hi ,

I have the following pipeline :
1. single hour window that counts the number of records 
2. single day window that accepts the aggregated data from #1 and emits the highest hour count of that day
3. union #1 + #2 
4. Logic operator that accepts the data from #3 and keep a listState of #2 and apply some logic on #1 based on that state (e.g comparing a single hour the history of the max hours at the last X days ) and emits the result

the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow lateness of 3 hours

 the problem is that when I try to do unit tests of all the pipeline, the data from #1 rich #4 before the latter accepts the data from #3 hence it doesn't have any state yet (state is always empty when the stream from #1 arrives ). 
My source in the tests is a collection that represents the records.
 is there anyway I can solve this ?
Screen Shot 2019-12-25 at 13.04.17.png
I appreciate any help you can provide
Cheers
Avi


Reply | Threaded
Open this post in threaded view
|

Re: testing - syncing data timeline

Kurt Young
Hi,

You can merge the logic of #2 into #4, it will be much simpler. 

Best,
Kurt


On Wed, Dec 25, 2019 at 7:36 PM Avi Levi <[hidden email]> wrote:
 Hi ,

I have the following pipeline :
1. single hour window that counts the number of records 
2. single day window that accepts the aggregated data from #1 and emits the highest hour count of that day
3. union #1 + #2 
4. Logic operator that accepts the data from #3 and keep a listState of #2 and apply some logic on #1 based on that state (e.g comparing a single hour the history of the max hours at the last X days ) and emits the result

the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow lateness of 3 hours

 the problem is that when I try to do unit tests of all the pipeline, the data from #1 rich #4 before the latter accepts the data from #3 hence it doesn't have any state yet (state is always empty when the stream from #1 arrives ). 
My source in the tests is a collection that represents the records.
 is there anyway I can solve this ?
Screen Shot 2019-12-25 at 13.04.17.png
I appreciate any help you can provide
Cheers
Avi


Reply | Threaded
Open this post in threaded view
|

Re: testing - syncing data timeline

avilevi
not sure that I can see how it is simpler. #2 is time window per day it emits the highest hour for that day. #4 is not a time window it keeps history of several days . if I want to put the logic of #2 I will need to manage it with timers, correct ?

On Thu, Dec 26, 2019 at 6:28 AM Kurt Young <[hidden email]> wrote:
This Message originated outside your organization.

Hi,

You can merge the logic of #2 into #4, it will be much simpler. 

Best,
Kurt


On Wed, Dec 25, 2019 at 7:36 PM Avi Levi <[hidden email]> wrote:
 Hi ,

I have the following pipeline :
1. single hour window that counts the number of records 
2. single day window that accepts the aggregated data from #1 and emits the highest hour count of that day
3. union #1 + #2 
4. Logic operator that accepts the data from #3 and keep a listState of #2 and apply some logic on #1 based on that state (e.g comparing a single hour the history of the max hours at the last X days ) and emits the result

the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow lateness of 3 hours

 the problem is that when I try to do unit tests of all the pipeline, the data from #1 rich #4 before the latter accepts the data from #3 hence it doesn't have any state yet (state is always empty when the stream from #1 arrives ). 
My source in the tests is a collection that represents the records.
 is there anyway I can solve this ?
Screen Shot 2019-12-25 at 13.04.17.png
I appreciate any help you can provide
Cheers
Avi


Reply | Threaded
Open this post in threaded view
|

Re: testing - syncing data timeline

Kurt Young
Lets say you keep your #1, which does hourly counting, and emit result to the merged
new #2. The new #2 would first keep all hourly result in state, and also keep tracking 
whether it already receive all 24 results belong to same day. Once you received all 24
count belong to the same day, you can start your logic. You could also determine what
kind of data you want to keep in state after that. 

Best,
Kurt


On Thu, Dec 26, 2019 at 1:14 PM Avi Levi <[hidden email]> wrote:
not sure that I can see how it is simpler. #2 is time window per day it emits the highest hour for that day. #4 is not a time window it keeps history of several days . if I want to put the logic of #2 I will need to manage it with timers, correct ?

On Thu, Dec 26, 2019 at 6:28 AM Kurt Young <[hidden email]> wrote:
This Message originated outside your organization.

Hi,

You can merge the logic of #2 into #4, it will be much simpler. 

Best,
Kurt


On Wed, Dec 25, 2019 at 7:36 PM Avi Levi <[hidden email]> wrote:
 Hi ,

I have the following pipeline :
1. single hour window that counts the number of records 
2. single day window that accepts the aggregated data from #1 and emits the highest hour count of that day
3. union #1 + #2 
4. Logic operator that accepts the data from #3 and keep a listState of #2 and apply some logic on #1 based on that state (e.g comparing a single hour the history of the max hours at the last X days ) and emits the result

the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow lateness of 3 hours

 the problem is that when I try to do unit tests of all the pipeline, the data from #1 rich #4 before the latter accepts the data from #3 hence it doesn't have any state yet (state is always empty when the stream from #1 arrives ). 
My source in the tests is a collection that represents the records.
 is there anyway I can solve this ?
Screen Shot 2019-12-25 at 13.04.17.png
I appreciate any help you can provide
Cheers
Avi