Enriching DataStream using static DataSet in Flink streaming

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Enriching DataStream using static DataSet in Flink streaming

vijay kansal
Hi All

I am writing a Flink streaming program in which I need to enrich a DataStream of user events using some static data set (information base, IB).

For E.g. Let's say we have a static data set of buyers and we have an incoming clickstream of events, for each event we want to add a boolean flag indicating whether the doer of the event is a buyer or not.

An ideal way to achieve this would be to partition the incoming stream by user id, have the buyers set available in a DataSet partitioned again by user id and then do a look up for each event in the stream into this DataSet.

Since Flink does not allow using DataSets in a streaming program, how can I achieve the above ?

Another option could be to use Managed Operator State to store buyers set, but how can I keep this state distributed by user id so as to avoid network i/o in individual event look ups ? In case of memory state backend, does state remain distributed by some key, or is it replicated across all operator subtasks ?

What is the right design pattern to achieve the above enriching requirement in a Flink streaming program ?


Thanks

Vijay Kansal
Software Development Engineer
LimeRoad
Reply | Threaded
Open this post in threaded view
|

Re: Enriching DataStream using static DataSet in Flink streaming

Fabian Hueske-2
Hi,

This type of applications are not super well supported by Flink, yet. The missing feature is on the roadmap and called Side Inputs [1].
There are (at least) two alternatives but both have some drawbacks:

1) Ingest the static data set as regular DataStream, keyBy the static and the actual stream, connect them, and join the results in a CoFlatMap or CoProcessFunction. The problem with this approach is that you cannot control in a Co*Function which input to consume. Therefore, you will consume records from the stream that you want to enrich before you consumed all records of the static data set. You can deal with the problem, by either putting the records to enrich into state and join them when the static record arrived or you discard the data.
2) You load the static data set in the open() method of a RichFlatMapFunction. The problem here is that the data is not partitioned by user id. Each parallel task would need to hold a full copy of the static data set.

2018-04-04 9:41 GMT+02:00 vijay kansal <[hidden email]>:
Hi All

I am writing a Flink streaming program in which I need to enrich a DataStream of user events using some static data set (information base, IB).

For E.g. Let's say we have a static data set of buyers and we have an incoming clickstream of events, for each event we want to add a boolean flag indicating whether the doer of the event is a buyer or not.

An ideal way to achieve this would be to partition the incoming stream by user id, have the buyers set available in a DataSet partitioned again by user id and then do a look up for each event in the stream into this DataSet.

Since Flink does not allow using DataSets in a streaming program, how can I achieve the above ?

Another option could be to use Managed Operator State to store buyers set, but how can I keep this state distributed by user id so as to avoid network i/o in individual event look ups ? In case of memory state backend, does state remain distributed by some key, or is it replicated across all operator subtasks ?

What is the right design pattern to achieve the above enriching requirement in a Flink streaming program ?


Thanks

Vijay Kansal
Software Development Engineer
LimeRoad