|
I was trying to join two keyed streams in a particular way and get a combined stream. For example: Lets say I call the two streams as X and Y. The X stream contains:
(Key,Value)
(A,P) (A,Q) (A,R) (B,P) (C,P) (C,Q)
The Y stream contains:
(Key,Value,Flag1,Flag2)
(A,M1,0,0) (A,M2,0,0) (A,M3,1,0) (A,M4,0,0) (A,M5,1,0) (A,M6,0,1) (B,N1,0,0) (B,N2,1,0) (B,N3,0,1) (C,O1,1,0) (C,O2,0,1)
My objective is to join these two streams and get the combined value as described. I want a keywise aggregated data of "Value" field from the X stream. In the Y stream I want a keywise aggregation of "Value" field based on "Flag1" i.e., the output will be set of aggregated values. I want to join these two streams by maintaining a keyed window and that window gets triggered only when the "Flag2" value of a particular key in the Y stream is "1". These flag values are available only with Y stream and not with the X stream. Thus my end result should look like:
(Key,Data,Value1,Value2,Flag)
(A,(P#Q#R),[(M1#M2#M4#M6),(M3#M5)]) (B,P,[(N1#N3),(N2)]) (C,(P#Q),[(O1),(O2)])
The timings for each of the rows in each stream are such that by the time the Flag2 value is 1 in stream Y (indicates some sort of end of a session), all the rows in stream X are also already available.
I tried to maintains state value inside my join function to get to the output. But I dont know how to query the state value and when to do it. Can anyone please suggest some solution?
|