Process stream multiple time with different KeyBy

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Process stream multiple time with different KeyBy

Lehuede sebastien
Hi all,

I'm currently working on a Flink Application where I match events against a set of rules. At the beginning I wanted to dynamically create streams following the category of events (Event are JSON formatted and I've a field like "category":"foo" in each event) but I'm stuck by the impossibility to create streams at runtime.

So, one of the solution for me is to create a single Kafka topic and then use the "KeyBy" operator to match events with "category":"foo" against rules also containing "category":"foo" in rule specification. 

Now I have some cases where events and rules have one category and one subcategory. At this point I'm not sure about the "KeyBy" operator behavior.

Example :
  • Events have : "category":"foo" AND "subcategory":"bar"
  • Rule1 specification has : "category":"foo" AND "subcategory":"bar"   
  • Rule2 specification has : "category':"foo"
  • Rule3 specification has : "category":"bar"
In this case, my events need to be match against Rule1, Rule2 and Rule3. 

If I'm right, if I apply a multiple key "KeyBy()" with "category" and "subcategory" fields and then apply two single key "KeyBy()" with "category" field, my events will be consumed by the first "KeyBy()" operator and no events will be streamed in the operators after ? 

Is there any way to process the same stream one time for multi key KeyBy() and another time for single key KeyBy() ? 

Thanks !
Sébastien.  
Reply | Threaded
Open this post in threaded view
|

Re: Process stream multiple time with different KeyBy

Eduardo Winpenny Tejedor
Hi Sebastien,

Without being entirely sure of what's your use case/end goal I'll tell
you (some of) the options Flink provides you for defining a flow.

If your use case is to apply the same rule to each of your "swimlanes"
of data (one with category=foo AND subcategory=bar, another with
category=foo and another with category=bar) you can do this by
implementing your own org.apache.flink.api.java.functions.KeySelector
function for the keyBy function. You'll just need to return a
different key for each of your rules and the data will separate to the
appropriate "swimlane".

If your use case is to apply different rules to each swimlane then you
can write a ProcessFunction that redirects elements to different *side
outputs*. You can then apply different operations to each side output.

Your application could get tricky to evolve IF the number of swimlanes
or the operators are meant to change over time, you'd have to be
careful how the existing state fits into your new flows.

Regards,
Eduardo

On Mon, Feb 17, 2020 at 7:06 PM Lehuede sebastien <[hidden email]> wrote:

>
> Hi all,
>
> I'm currently working on a Flink Application where I match events against a set of rules. At the beginning I wanted to dynamically create streams following the category of events (Event are JSON formatted and I've a field like "category":"foo" in each event) but I'm stuck by the impossibility to create streams at runtime.
>
> So, one of the solution for me is to create a single Kafka topic and then use the "KeyBy" operator to match events with "category":"foo" against rules also containing "category":"foo" in rule specification.
>
> Now I have some cases where events and rules have one category and one subcategory. At this point I'm not sure about the "KeyBy" operator behavior.
>
> Example :
>
> Events have : "category":"foo" AND "subcategory":"bar"
> Rule1 specification has : "category":"foo" AND "subcategory":"bar"
> Rule2 specification has : "category':"foo"
> Rule3 specification has : "category":"bar"
>
> In this case, my events need to be match against Rule1, Rule2 and Rule3.
>
> If I'm right, if I apply a multiple key "KeyBy()" with "category" and "subcategory" fields and then apply two single key "KeyBy()" with "category" field, my events will be consumed by the first "KeyBy()" operator and no events will be streamed in the operators after ?
>
> Is there any way to process the same stream one time for multi key KeyBy() and another time for single key KeyBy() ?
>
> Thanks !
> Sébastien.
Reply | Threaded
Open this post in threaded view
|

AW: Process stream multiple time with different KeyBy

Theo
Hi Sebastian, 

I'd also highly recommend a recent Flink blog post to you where exactly this question was answered in quote some detail :

https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html

Best regards
Theo 


-------- Ursprüngliche Nachricht --------
Von: Eduardo Winpenny Tejedor <[hidden email]>
Datum: Mo., 17. Feb. 2020, 21:07
An: Lehuede sebastien <[hidden email]>
Cc: user <[hidden email]>
Betreff: Re: Process stream multiple time with different KeyBy

Hi Sebastien,

Without being entirely sure of what's your use case/end goal I'll tell
you (some of) the options Flink provides you for defining a flow.

If your use case is to apply the same rule to each of your "swimlanes"
of data (one with category=foo AND subcategory=bar, another with
category=foo and another with category=bar) you can do this by
implementing your own org.apache.flink.api.java.functions.KeySelector
function for the keyBy function. You'll just need to return a
different key for each of your rules and the data will separate to the
appropriate "swimlane".

If your use case is to apply different rules to each swimlane then you
can write a ProcessFunction that redirects elements to different *side
outputs*. You can then apply different operations to each side output.

Your application could get tricky to evolve IF the number of swimlanes
or the operators are meant to change over time, you'd have to be
careful how the existing state fits into your new flows.

Regards,
Eduardo

On Mon, Feb 17, 2020 at 7:06 PM Lehuede sebastien <[hidden email]> wrote:

>
> Hi all,
>
> I'm currently working on a Flink Application where I match events against a set of rules. At the beginning I wanted to dynamically create streams following the category of events (Event are JSON formatted and I've a field like "category":"foo" in each event) but I'm stuck by the impossibility to create streams at runtime.
>
> So, one of the solution for me is to create a single Kafka topic and then use the "KeyBy" operator to match events with "category":"foo" against rules also containing "category":"foo" in rule specification.
>
> Now I have some cases where events and rules have one category and one subcategory. At this point I'm not sure about the "KeyBy" operator behavior.
>
> Example :
>
> Events have : "category":"foo" AND "subcategory":"bar"
> Rule1 specification has : "category":"foo" AND "subcategory":"bar"
> Rule2 specification has : "category':"foo"
> Rule3 specification has : "category":"bar"
>
> In this case, my events need to be match against Rule1, Rule2 and Rule3.
>
> If I'm right, if I apply a multiple key "KeyBy()" with "category" and "subcategory" fields and then apply two single key "KeyBy()" with "category" field, my events will be consumed by the first "KeyBy()" operator and no events will be streamed in the operators after ?
>
> Is there any way to process the same stream one time for multi key KeyBy() and another time for single key KeyBy() ?
>
> Thanks !
> Sébastien.
Reply | Threaded
Open this post in threaded view
|

Re: Process stream multiple time with different KeyBy

Till Rohrmann
Hi Sébastien,

there is always the possibility to reuse a stream. Given a DataStream<Element> input, you can do the following:

KeyedStream<Element> a = input.keyBy(x -> f(x));
KeyedStream<Element> b = input.keyBy(x -> g(x));

This gives you two differently partitioned streams a and b.

If you want to evaluate every event against the full set of rules, then you could take a look at Flink Broadcast State Pattern [1]. It allows you to broadcast a stream of rules to all operators of a keyed input stream.


Cheers,
Till

On Mon, Feb 17, 2020 at 11:10 PM [hidden email] <[hidden email]> wrote:
Hi Sebastian,
I'd also highly recommend a recent Flink blog post to you where exactly this question was answered in quote some detail :
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
Best regardsTheo
-------- Ursprüngliche Nachricht --------
Von: Eduardo Winpenny Tejedor <[hidden email]>
Datum: Mo., 17. Feb. 2020, 21:07
An: Lehuede sebastien <[hidden email]>
Cc: user <[hidden email]>
Betreff: Re: Process stream multiple time with different KeyBy


Hi Sebastien,

Without being entirely sure of what's your use case/end goal I'll tell
you (some of) the options Flink provides you for defining a flow.

If your use case is to apply the same rule to each of your "swimlanes"
of data (one with category=foo AND subcategory=bar, another with
category=foo and another with category=bar) you can do this by
implementing your own org.apache.flink.api.java.functions.KeySelector
function for the keyBy function. You'll just need to return a
different key for each of your rules and the data will separate to the
appropriate "swimlane".

If your use case is to apply different rules to each swimlane then you
can write a ProcessFunction that redirects elements to different *side
outputs*. You can then apply different operations to each side output.

Your application could get tricky to evolve IF the number of swimlanes
or the operators are meant to change over time, you'd have to be
careful how the existing state fits into your new flows.

Regards,
Eduardo

On Mon, Feb 17, 2020 at 7:06 PM Lehuede sebastien <[hidden email]> wrote:
>
> Hi all,
>
> I'm currently working on a Flink Application where I match events against a set of rules. At the beginning I wanted to dynamically create streams following the category of events (Event are JSON formatted and I've a field like "category":"foo" in each event) but I'm stuck by the impossibility to create streams at runtime.
>
> So, one of the solution for me is to create a single Kafka topic and then use the "KeyBy" operator to match events with "category":"foo" against rules also containing "category":"foo" in rule specification.
>
> Now I have some cases where events and rules have one category and one subcategory. At this point I'm not sure about the "KeyBy" operator behavior.
>
> Example :
>
> Events have : "category":"foo" AND "subcategory":"bar"
> Rule1 specification has : "category":"foo" AND "subcategory":"bar"
> Rule2 specification has : "category':"foo"
> Rule3 specification has : "category":"bar"
>
> In this case, my events need to be match against Rule1, Rule2 and Rule3.
>
> If I'm right, if I apply a multiple key "KeyBy()" with "category" and "subcategory" fields and then apply two single key "KeyBy()" with "category" field, my events will be consumed by the first "KeyBy()" operator and no events will be streamed in the operators after ?
>
> Is there any way to process the same stream one time for multi key KeyBy() and another time for single key KeyBy() ?
>
> Thanks !
> Sébastien.
Reply | Threaded
Open this post in threaded view
|

Re: Process stream multiple time with different KeyBy

Lehuede sebastien
Hi guys,

Thanks for your answers and sorry for the late reply. 

My use case is :

I receive some events on one stream, each events can contain:
  • 1 field category
  • 1 field subcategory
  • 1 field category AND 1 field subcategory
Events are matched against rules which can contain :
  • 1 field category 
  • 1 field subcategory
  • 1 field category AND 1 field subcategory

Now, let's say I receive an Event containing the following fields, category=foo and subcategory=bar. I want to be able to match this event against rule also containing category=foo and subcategory=bar in the specification but I also want to be able to match this events against rules containing category=foo OR rules containing subcategory=bar in specification. 

But I think I already have many information in your answers, I will definitely take a look at the Fraud Detection System example for the DynamicKeyFunction. And try to work with 2 different streams (One stream for events with single Key an one stream for events with multiple Key) as suggested by Till. 

Thanks again !
Sébastien

On Tue, Feb 18, 2020 at 6:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Sébastien,

there is always the possibility to reuse a stream. Given a DataStream<Element> input, you can do the following:

KeyedStream<Element> a = input.keyBy(x -> f(x));
KeyedStream<Element> b = input.keyBy(x -> g(x));

This gives you two differently partitioned streams a and b.

If you want to evaluate every event against the full set of rules, then you could take a look at Flink Broadcast State Pattern [1]. It allows you to broadcast a stream of rules to all operators of a keyed input stream.


Cheers,
Till

On Mon, Feb 17, 2020 at 11:10 PM [hidden email] <[hidden email]> wrote:
Hi Sebastian,
I'd also highly recommend a recent Flink blog post to you where exactly this question was answered in quote some detail :
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
Best regardsTheo
-------- Ursprüngliche Nachricht --------
Von: Eduardo Winpenny Tejedor <[hidden email]>
Datum: Mo., 17. Feb. 2020, 21:07
An: Lehuede sebastien <[hidden email]>
Cc: user <[hidden email]>
Betreff: Re: Process stream multiple time with different KeyBy


Hi Sebastien,

Without being entirely sure of what's your use case/end goal I'll tell
you (some of) the options Flink provides you for defining a flow.

If your use case is to apply the same rule to each of your "swimlanes"
of data (one with category=foo AND subcategory=bar, another with
category=foo and another with category=bar) you can do this by
implementing your own org.apache.flink.api.java.functions.KeySelector
function for the keyBy function. You'll just need to return a
different key for each of your rules and the data will separate to the
appropriate "swimlane".

If your use case is to apply different rules to each swimlane then you
can write a ProcessFunction that redirects elements to different *side
outputs*. You can then apply different operations to each side output.

Your application could get tricky to evolve IF the number of swimlanes
or the operators are meant to change over time, you'd have to be
careful how the existing state fits into your new flows.

Regards,
Eduardo

On Mon, Feb 17, 2020 at 7:06 PM Lehuede sebastien <[hidden email]> wrote:
>
> Hi all,
>
> I'm currently working on a Flink Application where I match events against a set of rules. At the beginning I wanted to dynamically create streams following the category of events (Event are JSON formatted and I've a field like "category":"foo" in each event) but I'm stuck by the impossibility to create streams at runtime.
>
> So, one of the solution for me is to create a single Kafka topic and then use the "KeyBy" operator to match events with "category":"foo" against rules also containing "category":"foo" in rule specification.
>
> Now I have some cases where events and rules have one category and one subcategory. At this point I'm not sure about the "KeyBy" operator behavior.
>
> Example :
>
> Events have : "category":"foo" AND "subcategory":"bar"
> Rule1 specification has : "category":"foo" AND "subcategory":"bar"
> Rule2 specification has : "category':"foo"
> Rule3 specification has : "category":"bar"
>
> In this case, my events need to be match against Rule1, Rule2 and Rule3.
>
> If I'm right, if I apply a multiple key "KeyBy()" with "category" and "subcategory" fields and then apply two single key "KeyBy()" with "category" field, my events will be consumed by the first "KeyBy()" operator and no events will be streamed in the operators after ?
>
> Is there any way to process the same stream one time for multi key KeyBy() and another time for single key KeyBy() ?
>
> Thanks !
> Sébastien.