Co-relate two streams

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Co-relate two streams

Abhinav Sharma
Hi,

How can I co-relate two streams of different types in Flink?
Scenario: In stream1, I have data in pojo with a field user. In stream2, I have data in a different pojo which also contains the field user. (However, other than the user field, they have no common field).

Now what I want to do is relate the two streams such that for every event in stream1, I want to collect events from stream2 where the user is the same. Both stream1 and stream2 are unbounded.

I tried using
stream1.connect(stream2).process(new CoProcessFunction<Type1, Type2, Type2>) {
private String user;

public void processElement1(Type1 inp, CoProcessFunction<Type1, Type2, Type2>.Context ctx, Collector<Type2> out)  {
user = inp.getUser();
}

public void processElement2(Type2 inp, CoProcessFunction<Type1, Type2, Type2>.Context ctx, Collector<Type2> out)  {
if (user.equals(inp.getUser())) {
out.collect(inp);
}
}
});

But this works only and only if both elements occur simultaneously.

How can I collect the cases with history? Is using ListState required?
Is there some better way to this in Flink?


Requesting help,
Abhinav
Reply | Threaded
Open this post in threaded view
|

Re: Co-relate two streams

Arvid Heise-4
Hi Abhinav,

sounds like you want to implement a join [1]. You usually want to use a window and then correlate the data between them only within the timeframe. You can use global windows if you cannot add a time window, but note that the state will grow indefinitely.

If one of the sources is small, also consider the broadcast state pattern. [2]

Note that if you are application is only doing standard relational algebra, I'd recommend Table API/SQL which will produce faster applications [3].


On Wed, Feb 24, 2021 at 11:14 AM Abhinav Sharma <[hidden email]> wrote:
Hi,

How can I co-relate two streams of different types in Flink?
Scenario: In stream1, I have data in pojo with a field user. In stream2, I have data in a different pojo which also contains the field user. (However, other than the user field, they have no common field).

Now what I want to do is relate the two streams such that for every event in stream1, I want to collect events from stream2 where the user is the same. Both stream1 and stream2 are unbounded.

I tried using
stream1.connect(stream2).process(new CoProcessFunction<Type1, Type2, Type2>) {
private String user;

public void processElement1(Type1 inp, CoProcessFunction<Type1, Type2, Type2>.Context ctx, Collector<Type2> out)  {
user = inp.getUser();
}

public void processElement2(Type2 inp, CoProcessFunction<Type1, Type2, Type2>.Context ctx, Collector<Type2> out)  {
if (user.equals(inp.getUser())) {
out.collect(inp);
}
}
});

But this works only and only if both elements occur simultaneously.

How can I collect the cases with history? Is using ListState required?
Is there some better way to this in Flink?


Requesting help,
Abhinav