Joining two aggregated streams

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Joining two aggregated streams

Udhay
I was trying to join two keyed streams in a particular way and get a combined stream.
For example:
Lets say I call the two streams as X and Y.
The X stream contains:

(Key,Value)

(A,P)
(A,Q)
(A,R)
(B,P)
(C,P)
(C,Q)

The Y stream contains:

(Key,Value,Flag1,Flag2)

(A,M1,0,0)
(A,M2,0,0)
(A,M3,1,0)
(A,M4,0,0)
(A,M5,1,0)
(A,M6,0,1)
(B,N1,0,0)
(B,N2,1,0)
(B,N3,0,1)
(C,O1,1,0)
(C,O2,0,1)

My objective is to join these two streams and get the combined value as described. I want a keywise aggregated data of "Value" field from the X stream. In the Y stream I want a keywise aggregation of "Value" field based on "Flag1" i.e., the output will be set of aggregated values. I want to join these two streams by maintaining a keyed window and that window gets triggered only when the "Flag2" value of a particular key in the Y stream is "1". These flag values are available only with Y stream and not with the X stream. Thus my end result should look like:

(Key,Value1,Value2)

(A,(P#Q#R),[(M1#M2#M4#M6),(M3#M5)])
(B,P,[(N1#N3),(N2)])
(C,(P#Q),[(O1),(O2)])

The timings for each of the rows in each stream are such that by the time the Flag2 value is 1 in stream Y (indicates some sort of end of a session), all the rows in stream X are also already available.

I tried to maintains state value inside my join function to get to the output. But I dont know how to query the state value and when to do it. Can anyone please suggest some solution?

Reply | Threaded
Open this post in threaded view
|

Re: Joining two aggregated streams

Aljoscha Krettek
Hi,

For this case, I would suggest to implement the join operation “by hand” using a CoProcessFunction: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html#low-level-joins. You would have the first stream on the first input and the second stream on the second input. Inside the function you keep in state the stuff that you want to emit when you see that flag. Possibly in ListState. I would also suggest to set a cleanup timer to make sure that you cleanup state in case you never see the flag that triggers processing of your window.

Best,
Aljoscha

On 5. Jul 2017, at 11:53, Udhay <[hidden email]> wrote:

I was trying to join two keyed streams in a particular way and get a combined
stream.
For example:
Lets say I call the two streams as X and Y.
The X stream contains:

(Key,Value)

(A,P)
(A,Q)
(A,R)
(B,P)
(C,P)
(C,Q)

The Y stream contains:

(Key,Value,Flag1,Flag2)

(A,M1,0,0)
(A,M2,0,0)
(A,M3,1,0)
(A,M4,0,0)
(A,M5,1,0)
(A,M6,0,1)
(B,N1,0,0)
(B,N2,1,0)
(B,N3,0,1)
(C,O1,1,0)
(C,O2,0,1)

My objective is to join these two streams and get the combined value as
described. I want a keywise aggregated data of "Value" field from the X
stream. In the Y stream I want a keywise aggregation of "Value" field based
on "Flag1" i.e., the output will be set of aggregated values. I want to join
these two streams by maintaining a keyed window and that window gets
triggered only when the "Flag2" value of a particular key in the Y stream is
"1". These flag values are available only with Y stream and not with the X
stream. Thus my end result should look like:

(Key,Value1,Value2)

(A,(P#Q#R),[(M1#M2#M4#M6),(M3#M5)])
(B,P,[(N1#N3),(N2)])
(C,(P#Q),[(O1),(O2)])

The timings for each of the rows in each stream are such that by the time
the Flag2 value is 1 in stream Y (indicates some sort of end of a session),
all the rows in stream X are also already available.

I tried to maintains state value inside my join function to get to the
output. But I dont know how to query the state value and when to do it. Can
anyone please suggest some solution?





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-aggregated-streams-tp14123.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Joining two aggregated streams

Udhay
Hi

Thanks for your suggestion. I ll try this one.:)

-Udhay.