Operations dependencies between values with different key in a ConnectedStreams

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Operations dependencies between values with different key in a ConnectedStreams

gdibernardo
Hi guys,

I have a question for you. I have an application with two keyed data streams: one for control and the other one for the data. Each control message represents an operation to be performed on the data values marked with a certain identifier. I connected the two streams and I process the data with a CoProcessFunction.

The operations I do are really simple like collecting the MAX or the MEAN value of the last n seconds. Now, I would like to create more complex operations where the result value of a key might depend by the result value of another key. To be more clear, I would like to evaluate expressions like: if {ALL the values of data marked with id 22 in the last 5s} are BIGGER THAN {The MEAN value of data marked with id 17 in the last 3s}. In this example, I should defer the evaluation of the expression until I have the MEAN value of the right part of the expression and check it against ALL the data keyed with key 22 from the last 5 seconds. I’d like to ask you if something like this in Flink is doable and what is the best way to do that in your opinion. I am also checking how the CEP library works (can it be helpful?).

Thank you so much in advance.

Best,


Gabriele
Reply | Threaded
Open this post in threaded view
|

Re: Operations dependencies between values with different key in a ConnectedStreams

Chao Wang
Hi Gabriele,

I think CEP may be able to deal with this kind of expressions, in
general, although I am not sure about how to deal with different time
windows (5s and 3s, in your case). Take a look at the available patterns
in the CEP library doc:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html#the-pattern-api

Chao


On 07/28/2017 10:08 AM, Gabriele Di Bernardo wrote:

> Hi guys,
>
> I have a question for you. I have an application with two keyed data streams: one for control and the other one for the data. Each control message represents an operation to be performed on the data values marked with a certain identifier. I connected the two streams and I process the data with a CoProcessFunction.
>
> The operations I do are really simple like collecting the MAX or the MEAN value of the last n seconds. Now, I would like to create more complex operations where the result value of a key might depend by the result value of another key. To be more clear, I would like to evaluate expressions like: if {ALL the values of data marked with id 22 in the last 5s} are BIGGER THAN {The MEAN value of data marked with id 17 in the last 3s}. In this example, I should defer the evaluation of the expression until I have the MEAN value of the right part of the expression and check it against ALL the data keyed with key 22 from the last 5 seconds. I’d like to ask you if something like this in Flink is doable and what is the best way to do that in your opinion. I am also checking how the CEP library works (can it be helpful?).
>
> Thank you so much in advance.
>
> Best,
>
>
> Gabriele

Reply | Threaded
Open this post in threaded view
|

Re: Operations dependencies between values with different key in a ConnectedStreams

Aljoscha Krettek
Hi Gabriele,

Yes, something like this is possible with Flink. However, you have to implement a two-stage approach for this that I would roughly call "scatter-gather". You have three operators:

input -> Scatter -> State -> Gather -> output

Where the "Scatter" analyses the what state you need for which key, then sends requests for those downstream as well as the original message. All of these messages must be amended with a unique ID. The "State" operator has the actual state, it receives the "request" events and sends the state that it has for the given key downstream, again with the unique ID that was generated earlier. The "Gather" operator would receive the original message and all the different bits of state that were emitted by the "State" operator. Here you need to buffer and wait until you receive all the messages for a given unique ID. Once you have those you can process.

This is a presentation from ING who implemented that approach (along with a DSL for specifying calculations): https://www.slideshare.net/FlinkForward/flink-forward-sf-2017-erik-de-nooij-streaming-models-how-ing-adds-models-at-runtime-to-catch-fraudsters. It has nice pictures and might be more helpful than my description.

Best,
Aljoscha

On 28. Jul 2017, at 22:14, Chao Wang <[hidden email]> wrote:

Hi Gabriele,

I think CEP may be able to deal with this kind of expressions, in general, although I am not sure about how to deal with different time windows (5s and 3s, in your case). Take a look at the available patterns in the CEP library doc: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html#the-pattern-api

Chao


On 07/28/2017 10:08 AM, Gabriele Di Bernardo wrote:
Hi guys,

I have a question for you. I have an application with two keyed data streams: one for control and the other one for the data. Each control message represents an operation to be performed on the data values marked with a certain identifier. I connected the two streams and I process the data with a CoProcessFunction.

The operations I do are really simple like collecting the MAX or the MEAN value of the last n seconds. Now, I would like to create more complex operations where the result value of a key might depend by the result value of another key. To be more clear, I would like to evaluate expressions like: if {ALL the values of data marked with id 22 in the last 5s} are BIGGER THAN {The MEAN value of data marked with id 17 in the last 3s}. In this example, I should defer the evaluation of the expression until I have the MEAN value of the right part of the expression and check it against ALL the data keyed with key 22 from the last 5 seconds. I’d like to ask you if something like this in Flink is doable and what is the best way to do that in your opinion. I am also checking how the CEP library works (can it be helpful?).

Thank you so much in advance.

Best,


Gabriele