Datastream broadcast with KeyBy

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Datastream broadcast with KeyBy

anujk
Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
ProcessFunction.  State Management (with RocksDB) and Timers are working
well.
Now we have to extend this by having another Config Stream which we want to
broadcast to all process operators. So wanted to connect the Data Stream
with Config Stream (with Config Stream being broadcast) and use
CoProcessFunction to handle both streams.

KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
can return only one partition (Array of SelectedChannel option as in
BroadcastPartitioner is not allowed).
Would have liked this to work —
dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
but it says both stream must be keyed.

Is there any way to make this work?

dataStream.connect(confStream.broadcast()).flatMap(...
RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
processFunction functionality.

Thanks,
Anuj



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Datastream broadcast with KeyBy

Piotr Nowojski
Hi,

Could you elaborate what is the problem that you are having? What is the exception(s) that you are getting? I have tested such simple example and it’s seems to be working as expected:

DataStreamSource<Integer> input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);

DataStreamSource<Integer> confStream = env.fromElements(42);

input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new MyCoProcessFunction()).print();

Thanks, Piotrek

On 10 Jan 2018, at 10:01, anujk <[hidden email]> wrote:

Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
ProcessFunction.  State Management (with RocksDB) and Timers are working
well.
Now we have to extend this by having another Config Stream which we want to
broadcast to all process operators. So wanted to connect the Data Stream
with Config Stream (with Config Stream being broadcast) and use
CoProcessFunction to handle both streams.

KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
can return only one partition (Array of SelectedChannel option as in
BroadcastPartitioner is not allowed).
Would have liked this to work —
dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
but it says both stream must be keyed.

Is there any way to make this work?

dataStream.connect(confStream.broadcast()).flatMap(...
RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
processFunction functionality.

Thanks,
Anuj



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Datastream broadcast with KeyBy

Fabian Hueske-2
Hi Anuj,

connecting a keyed stream and a broadcasted stream is not supported at the moment but there is work in progress [1] to add this functionality for the next release (Flink 1.5.0).

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-3659

2018-01-10 12:21 GMT+01:00 Piotr Nowojski <[hidden email]>:
Hi,

Could you elaborate what is the problem that you are having? What is the exception(s) that you are getting? I have tested such simple example and it’s seems to be working as expected:

DataStreamSource<Integer> input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);

DataStreamSource<Integer> confStream = env.fromElements(42);

input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new MyCoProcessFunction()).print();

Thanks, Piotrek

On 10 Jan 2018, at 10:01, anujk <[hidden email]> wrote:

Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
ProcessFunction.  State Management (with RocksDB) and Timers are working
well.
Now we have to extend this by having another Config Stream which we want to
broadcast to all process operators. So wanted to connect the Data Stream
with Config Stream (with Config Stream being broadcast) and use
CoProcessFunction to handle both streams.

KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
can return only one partition (Array of SelectedChannel option as in
BroadcastPartitioner is not allowed).
Would have liked this to work —
dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
but it says both stream must be keyed.

Is there any way to make this work?

dataStream.connect(confStream.broadcast()).flatMap(...
RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
processFunction functionality.

Thanks,
Anuj



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/