Filter function in flink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Filter function in flink

jaswin.shah@outlook.com
Hi,

Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.

Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.

Reply | Threaded
Open this post in threaded view
|

Re: Filter function in flink

Chesnay Schepler
originalStream = ...
filteredStream = originalStream.filter(filterA)
differentelyFilteredStream = originalStream.filter(filterB)

originalStream.map(<this works on the original stream>)
filteredStream.map(<this works on the filtered stream>)
differentelyFilteredStream .map(<this works on the differently filtered stream>)

On 11/06/2020 13:08, Jaswin Shah wrote:
Hi,

Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.

Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.


Reply | Threaded
Open this post in threaded view
|

Re: Filter function in flink

jaswin.shah@outlook.com
Hi Chesnay,
Thanks for responding. So, according to you filter function should create a new filtered stream and does not update the original stream. But, in DAG why it does not show them as different branches if this is the case? Are you sure on this that filter operation does not change the original stream and create new stream .

From: Chesnay Schepler <[hidden email]>
Sent: 11 June 2020 17:29
To: Jaswin Shah <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Filter function in flink
 
originalStream = ...
filteredStream = originalStream.filter(filterA)
differentelyFilteredStream = originalStream.filter(filterB)

originalStream.map(<this works on the original stream>)
filteredStream.map(<this works on the filtered stream>)
differentelyFilteredStream .map(<this works on the differently filtered stream>)

On 11/06/2020 13:08, Jaswin Shah wrote:
Hi,

Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.

Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.


Reply | Threaded
Open this post in threaded view
|

Re: Filter function in flink

Chesnay Schepler
Flink does some optimizations where multiple operations (like maps, filters) are deployed as one operations, since it is more efficient to directly all functions after another instead of transmitting the intermediate data.

This is referred to as "chaining", and the DAG only shows these combined operations.

Here's another example that shows the original stream staying intact:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Integer> originalInput = env.fromElements(1, 2, 3);

DataStream<Integer> ones = originalInput.filter(value -> value == 1);
DataStream<Integer> twos = originalInput.filter(value -> value == 2);

ones.map(value -> "1s: " + value).print();
twos.map(value -> "2s: " + value).print();
originalInput.map(value -> "original: " + value).startNewChain().print();

env.execute();
Output:

original: 1
original: 2
original: 3
1s: 1
2s: 2

You  can disable chaining by calling startNewChain() on a DataStream, but this is usually only done for demonstration purposes since it impedes performance.
ones.map(value -> "1s: " + value).startNewChain().print();
twos.map(value -> "2s: " + value).startNewChain().print();
originalInput.map(value -> "original: " + value).startNewChain().print();
On 02/07/2020 18:26, Jaswin Shah wrote:
Hi Chesnay,
Thanks for responding. So, according to you filter function should create a new filtered stream and does not update the original stream. But, in DAG why it does not show them as different branches if this is the case? Are you sure on this that filter operation does not change the original stream and create new stream .

From: Chesnay Schepler [hidden email]
Sent: 11 June 2020 17:29
To: Jaswin Shah [hidden email]; [hidden email] [hidden email]
Subject: Re: Filter function in flink
 
originalStream = ...
filteredStream = originalStream.filter(filterA)
differentelyFilteredStream = originalStream.filter(filterB)

originalStream.map(<this works on the original stream>)
filteredStream.map(<this works on the filtered stream>)
differentelyFilteredStream .map(<this works on the differently filtered stream>)

On 11/06/2020 13:08, Jaswin Shah wrote:
Hi,

Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.

Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.