Hi,
Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream
with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.
Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.
|
originalStream = ...
filteredStream =
originalStream.filter(filterA)
differentelyFilteredStream =
originalStream.filter(filterB)
originalStream.map(<this works on
the original stream>)
filteredStream.map(<this works on
the filtered stream>)
differentelyFilteredStream
.map(<this works on the differently filtered stream>)
On 11/06/2020 13:08, Jaswin Shah wrote:
|
Hi Chesnay,
Thanks for responding. So, according to you filter function should create a new filtered stream and does not update the original stream. But, in DAG why it does not show them as different branches if this is the case? Are you sure on this that filter operation
does not change the original stream and create new stream .
From: Chesnay Schepler <[hidden email]>
Sent: 11 June 2020 17:29 To: Jaswin Shah <[hidden email]>; [hidden email] <[hidden email]> Subject: Re: Filter function in flink originalStream = ...
filteredStream = originalStream.filter(filterA)
differentelyFilteredStream = originalStream.filter(filterB)
originalStream.map(<this works on the original stream>)
filteredStream.map(<this works on the filtered stream>)
differentelyFilteredStream .map(<this works on the differently filtered stream>)
On 11/06/2020 13:08, Jaswin Shah wrote:
|
Flink does some optimizations where
multiple operations (like maps, filters) are deployed as one
operations, since it is more efficient to directly all functions
after another instead of transmitting the intermediate data.
This is referred to as "chaining", and
the DAG only shows these combined operations.
Here's another example that shows the
original stream staying intact:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> originalInput = env.fromElements(1, 2, 3); DataStream<Integer> ones = originalInput.filter(value -> value == 1); DataStream<Integer> twos = originalInput.filter(value -> value == 2); ones.map(value -> "1s: " + value).print(); twos.map(value -> "2s: " + value).print(); originalInput.map(value -> "original: " + value).startNewChain().print(); env.execute(); Output:
original: 1
original: 2 original: 3
1s: 1 2s: 2
You can disable chaining by calling startNewChain()
on a DataStream, but this is usually only
done for demonstration purposes since it impedes performance.
ones.map(value -> "1s: " + value).startNewChain().print(); twos.map(value -> "2s: " + value).startNewChain().print(); originalInput.map(value -> "original: " + value).startNewChain().print();
|
Free forum by Nabble | Edit this page |