What is the canonical way to accomplish this:
>Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any processing of the data stream until >the control stream is "ready", so to speak My particular use case is as follows: I have a CoFlatMap function. The data stream contains elements that need to be enriched with additional information (they come with some fields empty). The missing information is taken from the control stream, whose elements come through a kafka source. Essentially, what I want is to pause any processing until having read the full (control) topic, otherwise (at least initially) the output elements will not be enriched as expected. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
A combination of `BoundedMultiInput` and `InputSelectable` could help. You could see `org.apache.flink.table.runtime.operators.join.HashJoinOperator` for an usage example. The control topic have not to be bounded. There are maybe other approaches from later responses. I could not tell whether it is canonical or not. Best, Kezhu Wang On February 17, 2021 at 13:03:42, Salva Alcántara ([hidden email]) wrote:
|
Hi Kezhu,
`InputSelectable` is currently not exposed in the DataStream API because it might have side effects that need to be considered (e.g. are checkpoints still go through?). In any case, we don't have a good story for blocking a control stream yet. The best option is to buffer the other stream in state until the control stream is ready. You can also artifically slow down the other stream until then (e.g. by sleeping) to not buffer too much state. I hope this helps. Regards, Timo On 17.02.21 14:35, Kezhu Wang wrote: > A combination of `BoundedMultiInput` and `InputSelectable` could help. > You could see > `org.apache.flink.table.runtime.operators.join.HashJoinOperator` > for an usage example. The control topic have not to be bounded. > > There are maybe other approaches from later responses. I could not tell > whether it is canonical or not. > > Best, > Kezhu Wang > > On February 17, 2021 at 13:03:42, Salva Alcántara > ([hidden email] <mailto:[hidden email]>) wrote: > >> What is the canonical way to accomplish this: >> >> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any >> processing of the data stream until >the control stream is "ready", so to >> speak >> >> My particular use case is as follows: I have a CoFlatMap function. The >> data >> stream contains elements that need to be enriched with additional >> information (they come with some fields empty). The missing >> information is >> taken from the control stream, whose elements come through a kafka >> source. >> Essentially, what I want is to pause any processing until having read the >> full (control) topic, otherwise (at least initially) the output elements >> will not be enriched as expected. >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> |
Note that the question is also posted on SO [1]. On Wed, Feb 17, 2021 at 3:31 PM Timo Walther <[hidden email]> wrote: Hi Kezhu, |
Hi all, Thanks Arvid and Timo for more candidates. I also think “buffering until control side ready” should be more canonical in current stage of Flink. Timo has created FLINK-21392 for exposing user friendly data stream api to block one input temporarily. If one really want go deep down the rabbit hole as Arvid said, I have one approach from the top of my head. Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`, `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES` should achieve the goal and not interfering with checkpoint, but the control side must not be bounded before FLIP-147 delivered. [1] FLINK-21392: https://issues.apache.org/jira/browse/FLINK-21392 Kezhu Wang
On February 17, 2021 at 22:58:23, Arvid Heise ([hidden email]) wrote:
|
Note^2: InputSelectable is `@PublicEvolving` API, so it can be used. However as Timo pointed out, it would block the checkpointing. If I remember correctly there is a checkState that will not allow to use `InputSelectable` with enabled checkpointing. Piotrek śr., 17 lut 2021 o 16:46 Kezhu Wang <[hidden email]> napisał(a):
|
Piotr is right. So just ignore my words. It is the price of going deep down the rabbit hole:-). Best, Kezhu Wang On February 17, 2021 at 23:48:30, Piotr Nowojski ([hidden email]) wrote:
|
> Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`, `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES` > should achieve the goal and not interfering with checkpoint, but the control side must not be bounded before FLIP-147 delivered. Hmmmm, but I think in principle you are right Kezhu. This would work right now, if we just removed the check inside `StreamingJobGraphGenerator#preValidate`. Or more precisely modify the check to support `InpueSelectable` in source tasks. But that's probably a very very narrow use case. Piotrek śr., 17 lut 2021 o 16:58 Kezhu Wang <[hidden email]> napisał(a):
|
In reply to this post by Kezhu Wang
Good to know Kezhu, many thanks again!
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |