Implementing a “join” between a DataStream and a “set of rules”

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing a “join” between a DataStream and a “set of rules”

Sandybayev, Turar (CAI - Atlanta)

Hi,

 

What is the best practice recommendation for the following use case? We need to match a stream against a set of “rules”, which are essentially a Flink DataSet concept. Updates to this “rules set" are possible but not frequent. Each stream event must be checked against all the records in “rules set”, and each match produces one or more events into a sink. Number of records in a rule set are in the 6 digit range.

 

Currently we're simply loading rules into a local List of rules and using flatMap over an incoming DataStream. Inside flatMap, we're just iterating over a list comparing each event to each rule.

 

To speed up the iteration, we can also split the list into several batches, essentially creating a list of lists, and creating a separate thread to iterate over each sub-list (using Futures in either Java or Scala).

 

Questions:

1.            Is there a better way to do this kind of a join?

2.            If not, is it safe to add additional parallelism by creating new threads inside each flatMap operation, on top of what Flink is already doing?

 

Thanks in advance!

Turar

 

Reply | Threaded
Open this post in threaded view
|

Re: Implementing a “join” between a DataStream and a “set of rules”

Garvit Sharma
Hi,

For the above use case, you should do the following :

1. Convert your DataStream into KeyedDataStream by defining a key which would be used to get validated against your rules.
2. Same as 1 for rules stream.
3. Join the two keyedStreams using Flink's connect operator.
4. Store the rules into Flink's internal state i,e. Flink's managed keyed state.
5. Validate the data coming in the dataStream against the managed keyed state.

Refer to [1] [2] for more details.




On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta) <[hidden email]> wrote:

Hi,

 

What is the best practice recommendation for the following use case? We need to match a stream against a set of “rules”, which are essentially a Flink DataSet concept. Updates to this “rules set" are possible but not frequent. Each stream event must be checked against all the records in “rules set”, and each match produces one or more events into a sink. Number of records in a rule set are in the 6 digit range.

 

Currently we're simply loading rules into a local List of rules and using flatMap over an incoming DataStream. Inside flatMap, we're just iterating over a list comparing each event to each rule.

 

To speed up the iteration, we can also split the list into several batches, essentially creating a list of lists, and creating a separate thread to iterate over each sub-list (using Futures in either Java or Scala).

 

Questions:

1.            Is there a better way to do this kind of a join?

2.            If not, is it safe to add additional parallelism by creating new threads inside each flatMap operation, on top of what Flink is already doing?

 

Thanks in advance!

Turar

 




--

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that makes him master.
Reply | Threaded
Open this post in threaded view
|

Re: Implementing a “join” between a DataStream and a “set of rules”

Amit Jain
In reply to this post by Sandybayev, Turar (CAI - Atlanta)
Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
<[hidden email]> wrote:

> Hi,
>
>
>
> What is the best practice recommendation for the following use case? We need
> to match a stream against a set of “rules”, which are essentially a Flink
> DataSet concept. Updates to this “rules set" are possible but not frequent.
> Each stream event must be checked against all the records in “rules set”,
> and each match produces one or more events into a sink. Number of records in
> a rule set are in the 6 digit range.
>
>
>
> Currently we're simply loading rules into a local List of rules and using
> flatMap over an incoming DataStream. Inside flatMap, we're just iterating
> over a list comparing each event to each rule.
>
>
>
> To speed up the iteration, we can also split the list into several batches,
> essentially creating a list of lists, and creating a separate thread to
> iterate over each sub-list (using Futures in either Java or Scala).
>
>
>
> Questions:
>
> 1.            Is there a better way to do this kind of a join?
>
> 2.            If not, is it safe to add additional parallelism by creating
> new threads inside each flatMap operation, on top of what Flink is already
> doing?
>
>
>
> Thanks in advance!
>
> Turar
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Implementing a “join” between a DataStream and a “set of rules”

Aljoscha Krettek
Hi,


Best,
Aljoscha

On 5. Jun 2018, at 14:46, Amit Jain <[hidden email]> wrote:

Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
<[hidden email]> wrote:
Hi,



What is the best practice recommendation for the following use case? We need
to match a stream against a set of “rules”, which are essentially a Flink
DataSet concept. Updates to this “rules set" are possible but not frequent.
Each stream event must be checked against all the records in “rules set”,
and each match produces one or more events into a sink. Number of records in
a rule set are in the 6 digit range.



Currently we're simply loading rules into a local List of rules and using
flatMap over an incoming DataStream. Inside flatMap, we're just iterating
over a list comparing each event to each rule.



To speed up the iteration, we can also split the list into several batches,
essentially creating a list of lists, and creating a separate thread to
iterate over each sub-list (using Futures in either Java or Scala).



Questions:

1.            Is there a better way to do this kind of a join?

2.            If not, is it safe to add additional parallelism by creating
new threads inside each flatMap operation, on top of what Flink is already
doing?



Thanks in advance!

Turar



Reply | Threaded
Open this post in threaded view
|

Re: Implementing a “join” between a DataStream and a “set of rules”

Sandybayev, Turar (CAI - Atlanta)
In reply to this post by Garvit Sharma

Thanks Garvit for your suggestion!

 

From: Garvit Sharma <[hidden email]>
Date: Tuesday, June 5, 2018 at 8:44 AM
To: "Sandybayev, Turar (CAI - Atlanta)" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Implementing a “join” between a DataStream and a “set of rules”

 

Hi,

 

For the above use case, you should do the following :

 

1. Convert your DataStream into KeyedDataStream by defining a key which would be used to get validated against your rules.

2. Same as 1 for rules stream.

3. Join the two keyedStreams using Flink's connect operator.

4. Store the rules into Flink's internal state i,e. Flink's managed keyed state.

5. Validate the data coming in the dataStream against the managed keyed state.

 

Refer to [1] [2] for more details.

 

 

 

 

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta) <[hidden email]> wrote:

Hi,

 

What is the best practice recommendation for the following use case? We need to match a stream against a set of “rules”, which are essentially a Flink DataSet concept. Updates to this “rules set" are possible but not frequent. Each stream event must be checked against all the records in “rules set”, and each match produces one or more events into a sink. Number of records in a rule set are in the 6 digit range.

 

Currently we're simply loading rules into a local List of rules and using flatMap over an incoming DataStream. Inside flatMap, we're just iterating over a list comparing each event to each rule.

 

To speed up the iteration, we can also split the list into several batches, essentially creating a list of lists, and creating a separate thread to iterate over each sub-list (using Futures in either Java or Scala).

 

Questions:

1.            Is there a better way to do this kind of a join?

2.            If not, is it safe to add additional parallelism by creating new threads inside each flatMap operation, on top of what Flink is already doing?

 

Thanks in advance!

Turar

 



 

--


Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that makes him master.

Reply | Threaded
Open this post in threaded view
|

Re: Implementing a “join” between a DataStream and a “set of rules”

Sandybayev, Turar (CAI - Atlanta)
In reply to this post by Aljoscha Krettek

Hi Aljoscha,

 

Thank you, this seems like a match for this use case. Am I understanding correctly that since only MemoryStateBackend is available for broadcast state, the max amount possible is 5MB?

 

If I use Flink state mechanism for storing rules, I will still need to iterate through all rules inside of a flatMap, and there’s no higher-level join mechanism that I can employ, right? Is there any downside in trying to parallelize that iteration inside my user flatMap operation?

 

Thanks

Turar

 

From: Aljoscha Krettek <[hidden email]>
Date: Tuesday, June 5, 2018 at 12:05 PM
To: Amit Jain <[hidden email]>
Cc: "Sandybayev, Turar (CAI - Atlanta)" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Implementing a “join” between a DataStream and a “set of rules”

 

Hi,

 

 

Best,

Aljoscha



On 5. Jun 2018, at 14:46, Amit Jain <[hidden email]> wrote:

 

Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-17&#43;Side&#43;Inputs&#43;for&#43;DataStream&#43;API"> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
<[hidden email]> wrote:

Hi,



What is the best practice recommendation for the following use case? We need
to match a stream against a set of “rules”, which are essentially a Flink
DataSet concept. Updates to this “rules set" are possible but not frequent.
Each stream event must be checked against all the records in “rules set”,
and each match produces one or more events into a sink. Number of records in
a rule set are in the 6 digit range.



Currently we're simply loading rules into a local List of rules and using
flatMap over an incoming DataStream. Inside flatMap, we're just iterating
over a list comparing each event to each rule.



To speed up the iteration, we can also split the list into several batches,
essentially creating a list of lists, and creating a separate thread to
iterate over each sub-list (using Futures in either Java or Scala).



Questions:

1.            Is there a better way to do this kind of a join?

2.            If not, is it safe to add additional parallelism by creating
new threads inside each flatMap operation, on top of what Flink is already
doing?



Thanks in advance!

Turar

 

Reply | Threaded
Open this post in threaded view
|

Re: Implementing a “join” between a DataStream and a “set of rules”

Sandybayev, Turar (CAI - Atlanta)
In reply to this post by Amit Jain
Hi Amit,

In my current approach the idea for updating rule set data was to have some kind of a "control" stream that will trigger an update to a local data structure, or a "control" event within the main data stream that will trigger the same.

Using external system like a cache or database is also an option, but that still will require some kind of a trigger to reload rule set or a single rule, in case of any updates to it.

Others have suggested using Flink managed state, but I'm still not sure whether that is a generally recommended approach in this scenario, as it seems like it was more meant for windowing-type processing instead?

Thanks,
Turar

On 6/5/18, 8:46 AM, "Amit Jain" <[hidden email]> wrote:

    Hi Sandybayev,
   
    In the current state, Flink does not provide a solution to the
    mentioned use case. However, there is open FLIP[1] [2] which has been
    created to address the same.
   
    I can see in your current approach, you are not able to update the
    rule set data. I think you can update rule set data by building
    DataStream around changelogs which are stored in message
    queue/distributed file system.
    OR
    You can store rule set data in the external system where you can query
    for incoming keys from Flink.
   
    [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
    [2]: https://issues.apache.org/jira/browse/FLINK-6131
   
    On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
    <[hidden email]> wrote:
    > Hi,
    >
    >
    >
    > What is the best practice recommendation for the following use case? We need
    > to match a stream against a set of “rules”, which are essentially a Flink
    > DataSet concept. Updates to this “rules set" are possible but not frequent.
    > Each stream event must be checked against all the records in “rules set”,
    > and each match produces one or more events into a sink. Number of records in
    > a rule set are in the 6 digit range.
    >
    >
    >
    > Currently we're simply loading rules into a local List of rules and using
    > flatMap over an incoming DataStream. Inside flatMap, we're just iterating
    > over a list comparing each event to each rule.
    >
    >
    >
    > To speed up the iteration, we can also split the list into several batches,
    > essentially creating a list of lists, and creating a separate thread to
    > iterate over each sub-list (using Futures in either Java or Scala).
    >
    >
    >
    > Questions:
    >
    > 1.            Is there a better way to do this kind of a join?
    >
    > 2.            If not, is it safe to add additional parallelism by creating
    > new threads inside each flatMap operation, on top of what Flink is already
    > doing?
    >
    >
    >
    > Thanks in advance!
    >
    > Turar
    >
    >
   

Reply | Threaded
Open this post in threaded view
|

Re: Implementing a “join” between a DataStream and a “set of rules”

Fabian Hueske-2
Hi Turar,

Managed state is a general concept in Flink's DataStream API and not specifically designed for windows (although they use internally).
I'd recommend the broadcast state that Aljoscha proposed. It was specifically designed for these use cases.

It is true that the state is currently maintained in memory, but it is not bound to 5MB but rather the size of your heap (e.g., 100s MBs / GBs) if you configure a state backend that writes to a distributed file system (eg.g., FSStateBackend or RocksDBStateBackend). There is some ongoing work to also support broadcast state in RocksDB.

Best, Fabian



2018-06-05 22:53 GMT+02:00 Sandybayev, Turar (CAI - Atlanta) <[hidden email]>:
Hi Amit,

In my current approach the idea for updating rule set data was to have some kind of a "control" stream that will trigger an update to a local data structure, or a "control" event within the main data stream that will trigger the same.

Using external system like a cache or database is also an option, but that still will require some kind of a trigger to reload rule set or a single rule, in case of any updates to it.

Others have suggested using Flink managed state, but I'm still not sure whether that is a generally recommended approach in this scenario, as it seems like it was more meant for windowing-type processing instead?

Thanks,
Turar

On 6/5/18, 8:46 AM, "Amit Jain" <[hidden email]> wrote:

    Hi Sandybayev,

    In the current state, Flink does not provide a solution to the
    mentioned use case. However, there is open FLIP[1] [2] which has been
    created to address the same.

    I can see in your current approach, you are not able to update the
    rule set data. I think you can update rule set data by building
    DataStream around changelogs which are stored in message
    queue/distributed file system.
    OR
    You can store rule set data in the external system where you can query
    for incoming keys from Flink.

    [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
    [2]: https://issues.apache.org/jira/browse/FLINK-6131

    On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
    <[hidden email]> wrote:
    > Hi,
    >
    >
    >
    > What is the best practice recommendation for the following use case? We need
    > to match a stream against a set of “rules”, which are essentially a Flink
    > DataSet concept. Updates to this “rules set" are possible but not frequent.
    > Each stream event must be checked against all the records in “rules set”,
    > and each match produces one or more events into a sink. Number of records in
    > a rule set are in the 6 digit range.
    >
    >
    >
    > Currently we're simply loading rules into a local List of rules and using
    > flatMap over an incoming DataStream. Inside flatMap, we're just iterating
    > over a list comparing each event to each rule.
    >
    >
    >
    > To speed up the iteration, we can also split the list into several batches,
    > essentially creating a list of lists, and creating a separate thread to
    > iterate over each sub-list (using Futures in either Java or Scala).
    >
    >
    >
    > Questions:
    >
    > 1.            Is there a better way to do this kind of a join?
    >
    > 2.            If not, is it safe to add additional parallelism by creating
    > new threads inside each flatMap operation, on top of what Flink is already
    > doing?
    >
    >
    >
    > Thanks in advance!
    >
    > Turar
    >
    >