ConnectedStreams paused until control stream “ready”

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

ConnectedStreams paused until control stream “ready”

Salva Alcántara
What is the canonical way to accomplish this:

>Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
processing of the data stream until >the control stream is "ready", so to
speak

My particular use case is as follows: I have a CoFlatMap function. The data
stream contains elements that need to be enriched with additional
information (they come with some fields empty). The missing information is
taken from the control stream, whose elements come through a kafka source.
Essentially, what I want is to pause any processing until having read the
full (control) topic, otherwise (at least initially)  the output elements
will not be enriched as expected.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStreams paused until control stream “ready”

Kezhu Wang
A combination of `BoundedMultiInput` and `InputSelectable` could help. You could see `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
for an usage example. The control topic have not to be bounded.

There are maybe other approaches from later responses. I could not tell whether it is canonical or not.

Best,
Kezhu Wang

On February 17, 2021 at 13:03:42, Salva Alcántara ([hidden email]) wrote:

What is the canonical way to accomplish this:

>Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
processing of the data stream until >the control stream is "ready", so to
speak

My particular use case is as follows: I have a CoFlatMap function. The data
stream contains elements that need to be enriched with additional
information (they come with some fields empty). The missing information is
taken from the control stream, whose elements come through a kafka source.
Essentially, what I want is to pause any processing until having read the
full (control) topic, otherwise (at least initially) the output elements
will not be enriched as expected.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStreams paused until control stream “ready”

Timo Walther
Hi Kezhu,

`InputSelectable` is currently not exposed in the DataStream API because
it might have side effects that need to be considered (e.g. are
checkpoints still go through?). In any case, we don't have a good story
for blocking a control stream yet. The best option is to buffer the
other stream in state until the control stream is ready. You can also
artifically slow down the other stream until then (e.g. by sleeping) to
not buffer too much state.

I hope this helps.

Regards,
Timo


On 17.02.21 14:35, Kezhu Wang wrote:

> A combination of `BoundedMultiInput` and `InputSelectable` could help.
> You could see
> `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> for an usage example. The control topic have not to be bounded.
>
> There are maybe other approaches from later responses. I could not tell
> whether it is canonical or not.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:03:42, Salva Alcántara
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> What is the canonical way to accomplish this:
>>
>> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> processing of the data stream until >the control stream is "ready", so to
>> speak
>>
>> My particular use case is as follows: I have a CoFlatMap function. The
>> data
>> stream contains elements that need to be enriched with additional
>> information (they come with some fields empty). The missing
>> information is
>> taken from the control stream, whose elements come through a kafka
>> source.
>> Essentially, what I want is to pause any processing until having read the
>> full (control) topic, otherwise (at least initially) the output elements
>> will not be enriched as expected.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStreams paused until control stream “ready”

Arvid Heise-4

On Wed, Feb 17, 2021 at 3:31 PM Timo Walther <[hidden email]> wrote:
Hi Kezhu,

`InputSelectable` is currently not exposed in the DataStream API because
it might have side effects that need to be considered (e.g. are
checkpoints still go through?). In any case, we don't have a good story
for blocking a control stream yet. The best option is to buffer the
other stream in state until the control stream is ready. You can also
artifically slow down the other stream until then (e.g. by sleeping) to
not buffer too much state.

I hope this helps.

Regards,
Timo


On 17.02.21 14:35, Kezhu Wang wrote:
> A combination of `BoundedMultiInput` and `InputSelectable` could help.
> You could see
> `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> for an usage example. The control topic have not to be bounded.
>
> There are maybe other approaches from later responses. I could not tell
> whether it is canonical or not.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:03:42, Salva Alcántara
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> What is the canonical way to accomplish this:
>>
>> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> processing of the data stream until >the control stream is "ready", so to
>> speak
>>
>> My particular use case is as follows: I have a CoFlatMap function. The
>> data
>> stream contains elements that need to be enriched with additional
>> information (they come with some fields empty). The missing
>> information is
>> taken from the control stream, whose elements come through a kafka
>> source.
>> Essentially, what I want is to pause any processing until having read the
>> full (control) topic, otherwise (at least initially) the output elements
>> will not be enriched as expected.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStreams paused until control stream “ready”

Kezhu Wang
Hi all,

Thanks Arvid and Timo for more candidates.

I also think “buffering until control side ready” should be more canonical in current stage of Flink.

Timo has created FLINK-21392 for exposing user friendly data stream api to block one input temporarily.

If one really want go deep down the rabbit hole as Arvid said, I have one approach from the top of my head.

Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`, `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
should achieve the goal and not interfering with checkpoint, but the control side must not be bounded before FLIP-147 delivered.


Best,
Kezhu Wang

On February 17, 2021 at 22:58:23, Arvid Heise ([hidden email]) wrote:


On Wed, Feb 17, 2021 at 3:31 PM Timo Walther <[hidden email]> wrote:
Hi Kezhu,

`InputSelectable` is currently not exposed in the DataStream API because
it might have side effects that need to be considered (e.g. are
checkpoints still go through?). In any case, we don't have a good story
for blocking a control stream yet. The best option is to buffer the
other stream in state until the control stream is ready. You can also
artifically slow down the other stream until then (e.g. by sleeping) to
not buffer too much state.

I hope this helps.

Regards,
Timo


On 17.02.21 14:35, Kezhu Wang wrote:
> A combination of `BoundedMultiInput` and `InputSelectable` could help.
> You could see
> `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> for an usage example. The control topic have not to be bounded.
>
> There are maybe other approaches from later responses. I could not tell
> whether it is canonical or not.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:03:42, Salva Alcántara
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> What is the canonical way to accomplish this:
>>
>> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> processing of the data stream until >the control stream is "ready", so to
>> speak
>>
>> My particular use case is as follows: I have a CoFlatMap function. The
>> data
>> stream contains elements that need to be enriched with additional
>> information (they come with some fields empty). The missing
>> information is
>> taken from the control stream, whose elements come through a kafka
>> source.
>> Essentially, what I want is to pause any processing until having read the
>> full (control) topic, otherwise (at least initially) the output elements
>> will not be enriched as expected.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStreams paused until control stream “ready”

Piotr Nowojski-4
Note^2: InputSelectable is `@PublicEvolving` API, so it can be used. However as Timo pointed out, it would block the checkpointing. If I remember correctly there is a checkState that will not allow to use `InputSelectable` with enabled checkpointing.

Piotrek

śr., 17 lut 2021 o 16:46 Kezhu Wang <[hidden email]> napisał(a):
Hi all,

Thanks Arvid and Timo for more candidates.

I also think “buffering until control side ready” should be more canonical in current stage of Flink.

Timo has created FLINK-21392 for exposing user friendly data stream api to block one input temporarily.

If one really want go deep down the rabbit hole as Arvid said, I have one approach from the top of my head.

Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`, `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
should achieve the goal and not interfering with checkpoint, but the control side must not be bounded before FLIP-147 delivered.


Best,
Kezhu Wang

On February 17, 2021 at 22:58:23, Arvid Heise ([hidden email]) wrote:


On Wed, Feb 17, 2021 at 3:31 PM Timo Walther <[hidden email]> wrote:
Hi Kezhu,

`InputSelectable` is currently not exposed in the DataStream API because
it might have side effects that need to be considered (e.g. are
checkpoints still go through?). In any case, we don't have a good story
for blocking a control stream yet. The best option is to buffer the
other stream in state until the control stream is ready. You can also
artifically slow down the other stream until then (e.g. by sleeping) to
not buffer too much state.

I hope this helps.

Regards,
Timo


On 17.02.21 14:35, Kezhu Wang wrote:
> A combination of `BoundedMultiInput` and `InputSelectable` could help.
> You could see
> `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> for an usage example. The control topic have not to be bounded.
>
> There are maybe other approaches from later responses. I could not tell
> whether it is canonical or not.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:03:42, Salva Alcántara
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> What is the canonical way to accomplish this:
>>
>> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> processing of the data stream until >the control stream is "ready", so to
>> speak
>>
>> My particular use case is as follows: I have a CoFlatMap function. The
>> data
>> stream contains elements that need to be enriched with additional
>> information (they come with some fields empty). The missing
>> information is
>> taken from the control stream, whose elements come through a kafka
>> source.
>> Essentially, what I want is to pause any processing until having read the
>> full (control) topic, otherwise (at least initially) the output elements
>> will not be enriched as expected.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStreams paused until control stream “ready”

Kezhu Wang
Piotr is right. So just ignore my words. It is the price of going deep down the rabbit hole:-).


Best,
Kezhu Wang


On February 17, 2021 at 23:48:30, Piotr Nowojski ([hidden email]) wrote:

Note^2: InputSelectable is `@PublicEvolving` API, so it can be used. However as Timo pointed out, it would block the checkpointing. If I remember correctly there is a checkState that will not allow to use `InputSelectable` with enabled checkpointing.

Piotrek

śr., 17 lut 2021 o 16:46 Kezhu Wang <[hidden email]> napisał(a):
Hi all,

Thanks Arvid and Timo for more candidates.

I also think “buffering until control side ready” should be more canonical in current stage of Flink.

Timo has created FLINK-21392 for exposing user friendly data stream api to block one input temporarily.

If one really want go deep down the rabbit hole as Arvid said, I have one approach from the top of my head.

Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`, `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
should achieve the goal and not interfering with checkpoint, but the control side must not be bounded before FLIP-147 delivered.


Best,
Kezhu Wang

On February 17, 2021 at 22:58:23, Arvid Heise ([hidden email]) wrote:


On Wed, Feb 17, 2021 at 3:31 PM Timo Walther <[hidden email]> wrote:
Hi Kezhu,

`InputSelectable` is currently not exposed in the DataStream API because
it might have side effects that need to be considered (e.g. are
checkpoints still go through?). In any case, we don't have a good story
for blocking a control stream yet. The best option is to buffer the
other stream in state until the control stream is ready. You can also
artifically slow down the other stream until then (e.g. by sleeping) to
not buffer too much state.

I hope this helps.

Regards,
Timo


On 17.02.21 14:35, Kezhu Wang wrote:
> A combination of `BoundedMultiInput` and `InputSelectable` could help.
> You could see
> `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> for an usage example. The control topic have not to be bounded.
>
> There are maybe other approaches from later responses. I could not tell
> whether it is canonical or not.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:03:42, Salva Alcántara
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> What is the canonical way to accomplish this:
>>
>> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> processing of the data stream until >the control stream is "ready", so to
>> speak
>>
>> My particular use case is as follows: I have a CoFlatMap function. The
>> data
>> stream contains elements that need to be enriched with additional
>> information (they come with some fields empty). The missing
>> information is
>> taken from the control stream, whose elements come through a kafka
>> source.
>> Essentially, what I want is to pause any processing until having read the
>> full (control) topic, otherwise (at least initially) the output elements
>> will not be enriched as expected.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStreams paused until control stream “ready”

Piotr Nowojski-4
> Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`, `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
> should achieve the goal and not interfering with checkpoint, but the control side must not be bounded before FLIP-147 delivered.

Hmmmm, but I think in principle you are right Kezhu. This would work right now, if we just removed the check inside `StreamingJobGraphGenerator#preValidate`. Or more precisely modify the check to support `InpueSelectable` in source tasks. But that's probably a very very narrow use case.

Piotrek

śr., 17 lut 2021 o 16:58 Kezhu Wang <[hidden email]> napisał(a):
Piotr is right. So just ignore my words. It is the price of going deep down the rabbit hole:-).


Best,
Kezhu Wang


On February 17, 2021 at 23:48:30, Piotr Nowojski ([hidden email]) wrote:

Note^2: InputSelectable is `@PublicEvolving` API, so it can be used. However as Timo pointed out, it would block the checkpointing. If I remember correctly there is a checkState that will not allow to use `InputSelectable` with enabled checkpointing.

Piotrek

śr., 17 lut 2021 o 16:46 Kezhu Wang <[hidden email]> napisał(a):
Hi all,

Thanks Arvid and Timo for more candidates.

I also think “buffering until control side ready” should be more canonical in current stage of Flink.

Timo has created FLINK-21392 for exposing user friendly data stream api to block one input temporarily.

If one really want go deep down the rabbit hole as Arvid said, I have one approach from the top of my head.

Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`, `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
should achieve the goal and not interfering with checkpoint, but the control side must not be bounded before FLIP-147 delivered.


Best,
Kezhu Wang

On February 17, 2021 at 22:58:23, Arvid Heise ([hidden email]) wrote:


On Wed, Feb 17, 2021 at 3:31 PM Timo Walther <[hidden email]> wrote:
Hi Kezhu,

`InputSelectable` is currently not exposed in the DataStream API because
it might have side effects that need to be considered (e.g. are
checkpoints still go through?). In any case, we don't have a good story
for blocking a control stream yet. The best option is to buffer the
other stream in state until the control stream is ready. You can also
artifically slow down the other stream until then (e.g. by sleeping) to
not buffer too much state.

I hope this helps.

Regards,
Timo


On 17.02.21 14:35, Kezhu Wang wrote:
> A combination of `BoundedMultiInput` and `InputSelectable` could help.
> You could see
> `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> for an usage example. The control topic have not to be bounded.
>
> There are maybe other approaches from later responses. I could not tell
> whether it is canonical or not.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:03:42, Salva Alcántara
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> What is the canonical way to accomplish this:
>>
>> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> processing of the data stream until >the control stream is "ready", so to
>> speak
>>
>> My particular use case is as follows: I have a CoFlatMap function. The
>> data
>> stream contains elements that need to be enriched with additional
>> information (they come with some fields empty). The missing
>> information is
>> taken from the control stream, whose elements come through a kafka
>> source.
>> Essentially, what I want is to pause any processing until having read the
>> full (control) topic, otherwise (at least initially) the output elements
>> will not be enriched as expected.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStreams paused until control stream “ready”

Salva Alcántara
In reply to this post by Kezhu Wang
Good to know Kezhu, many thanks again!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/