How to export all not-null keyed ValueState

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

How to export all not-null keyed ValueState

Averell
Hi,

I have a keyed value state which is available for only about 1% the total
number of keyed values that I have. Is there any way to get the values of
all those state values?
I looked at the queryable state option, but it looks like supporting
querying by keyed value only.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Congxian Qiu
Hi, Averell

AFAIK, we can't get all the key-values from value state, but MapState has a function called `entries` can do this, maybe can use MapState as a workaround.
On May 7, 2019, 16:16 +0800, Averell <[hidden email]>, wrote:
Hi,

I have a keyed value state which is available for only about 1% the total
number of keyed values that I have. Is there any way to get the values of
all those state values?
I looked at the queryable state option, but it looks like supporting
querying by keyed value only.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Fabian Hueske-2
Hi,

The KeyedBroadcastProcessFunction has a method to iterate over all keys of a keyed state.
This function is available via the Context object of the processBroadcast() method.
Hence you need a broadcasted message to trigger the operation.

Best, Fabian

Am Do., 9. Mai 2019 um 08:46 Uhr schrieb Congxian Qiu <[hidden email]>:
Hi, Averell

AFAIK, we can't get all the key-values from value state, but MapState has a function called `entries` can do this, maybe can use MapState as a workaround.
On May 7, 2019, 16:16 +0800, Averell <[hidden email]>, wrote:
Hi,

I have a keyed value state which is available for only about 1% the total
number of keyed values that I have. Is there any way to get the values of
all those state values?
I looked at the queryable state option, but it looks like supporting
querying by keyed value only.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Averell
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Fabian Hueske-2
Hi,

Passing a Context through a DataStream definitely does not work.
You'd need to have the keyed state that you want to scan over in the KeyedBroadcastProcessFunction.

For the toggle filter use case, you would need to have a unioned stream with Toggle and StateReport events.
For the output, you can use side outputs to route the different outputs to different streams.

Best, Fabian

Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <[hidden email]>:
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Averell
Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle stream with the StateReportTrigger stream, would that means I need to make my Toggles broadcasted states? Or there's some way to modify the keyed states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous email). It seems working, but I am not confident in that, not sure whether it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main data stream, which might cause a high impact on performance (my toggle is on in only about 1% of the keys, and the rate of input1.left is less than a millionth comparing to the rate of input1.right)
/**
* This KeyedBroadcastProcessFunction has:
* input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
* input1.left: Toggles in the form of a tuple (Key, Boolean).
* When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
* If it is false, records from input1.right for that same key will be dropped
* input1.right: the main data stream
*
* input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
* the current value of Toggles will be sent out via the outputTag
*/
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {

val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

override def processElement(in1: Either[Toggle, MyEvent],
readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
collector: Collector[MyEvent]): Unit = {
in1 match {
case Left(toggle) =>
getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
case Right(event) =>
if (getRuntimeContext.getState(toggleStateDescriptor).value())
collector.collect(event)
}
}

override def processBroadcastElement(in2: Any,
context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
collector: Collector[MyEvent]): Unit = {
context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
if (s != null) context.output(outputTag, (k, s.value())))
}
}
Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <[hidden email]> wrote:
Hi,

Passing a Context through a DataStream definitely does not work.
You'd need to have the keyed state that you want to scan over in the KeyedBroadcastProcessFunction.

For the toggle filter use case, you would need to have a unioned stream with Toggle and StateReport events.
For the output, you can use side outputs to route the different outputs to different streams.

Best, Fabian

Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <[hidden email]>:
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Fabian Hueske-2
Hi Averell,

Ah, sorry. I had assumed the toggle events where broadcasted anyway.
Since you had both streams keyed, your current solution looks fine to me.

Best,
Fabian

Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <[hidden email]>:
Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle stream with the StateReportTrigger stream, would that means I need to make my Toggles broadcasted states? Or there's some way to modify the keyed states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous email). It seems working, but I am not confident in that, not sure whether it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main data stream, which might cause a high impact on performance (my toggle is on in only about 1% of the keys, and the rate of input1.left is less than a millionth comparing to the rate of input1.right)
/**
* This KeyedBroadcastProcessFunction has:
* input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
* input1.left: Toggles in the form of a tuple (Key, Boolean).
* When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
* If it is false, records from input1.right for that same key will be dropped
* input1.right: the main data stream
*
* input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
* the current value of Toggles will be sent out via the outputTag
*/
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {

val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

override def processElement(in1: Either[Toggle, MyEvent],
readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
collector: Collector[MyEvent]): Unit = {
in1 match {
case Left(toggle) =>
getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
case Right(event) =>
if (getRuntimeContext.getState(toggleStateDescriptor).value())
collector.collect(event)
}
}

override def processBroadcastElement(in2: Any,
context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
collector: Collector[MyEvent]): Unit = {
context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
if (s != null) context.output(outputTag, (k, s.value())))
}
}
Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <[hidden email]> wrote:
Hi,

Passing a Context through a DataStream definitely does not work.
You'd need to have the keyed state that you want to scan over in the KeyedBroadcastProcessFunction.

For the toggle filter use case, you would need to have a unioned stream with Toggle and StateReport events.
For the output, you can use side outputs to route the different outputs to different streams.

Best, Fabian

Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <[hidden email]>:
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Averell
Hi Fabian,

Thanks for that. However, as I mentioned in my previous email, that implementation requires a lot of typecasting/object wrapping.
I tried to broadcast that Toggle stream - the toggles will be saved as a MapState, and whenever an export trigger record arrived, I send out that MapState. There's no use of applyToKeyedState in this implementation. And the problem I got is I received duplicated output (one from each parallelism-instance).

Is there any option to modify the keyed state from within the processBroadcastElement() method?

Thanks a lot for your help.

Regards,
Averell


On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <[hidden email]> wrote:
Hi Averell,

Ah, sorry. I had assumed the toggle events where broadcasted anyway.
Since you had both streams keyed, your current solution looks fine to me.

Best,
Fabian

Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <[hidden email]>:
Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle stream with the StateReportTrigger stream, would that means I need to make my Toggles broadcasted states? Or there's some way to modify the keyed states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous email). It seems working, but I am not confident in that, not sure whether it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main data stream, which might cause a high impact on performance (my toggle is on in only about 1% of the keys, and the rate of input1.left is less than a millionth comparing to the rate of input1.right)
/**
* This KeyedBroadcastProcessFunction has:
* input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
* input1.left: Toggles in the form of a tuple (Key, Boolean).
* When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
* If it is false, records from input1.right for that same key will be dropped
* input1.right: the main data stream
*
* input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
* the current value of Toggles will be sent out via the outputTag
*/
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {

val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

override def processElement(in1: Either[Toggle, MyEvent],
readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
collector: Collector[MyEvent]): Unit = {
in1 match {
case Left(toggle) =>
getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
case Right(event) =>
if (getRuntimeContext.getState(toggleStateDescriptor).value())
collector.collect(event)
}
}

override def processBroadcastElement(in2: Any,
context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
collector: Collector[MyEvent]): Unit = {
context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
if (s != null) context.output(outputTag, (k, s.value())))
}
}
Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <[hidden email]> wrote:
Hi,

Passing a Context through a DataStream definitely does not work.
You'd need to have the keyed state that you want to scan over in the KeyedBroadcastProcessFunction.

For the toggle filter use case, you would need to have a unioned stream with Toggle and StateReport events.
For the output, you can use side outputs to route the different outputs to different streams.

Best, Fabian

Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <[hidden email]>:
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Fabian Hueske-2
Hi Averell,

I'd go with your approach any state access (given that you use RocksDB keyed state) or deduplication of messages is going to be more expensive than a simple cast.

Best, Fabian

Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <[hidden email]>:
Hi Fabian,

Thanks for that. However, as I mentioned in my previous email, that implementation requires a lot of typecasting/object wrapping.
I tried to broadcast that Toggle stream - the toggles will be saved as a MapState, and whenever an export trigger record arrived, I send out that MapState. There's no use of applyToKeyedState in this implementation. And the problem I got is I received duplicated output (one from each parallelism-instance).

Is there any option to modify the keyed state from within the processBroadcastElement() method?

Thanks a lot for your help.

Regards,
Averell


On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <[hidden email]> wrote:
Hi Averell,

Ah, sorry. I had assumed the toggle events where broadcasted anyway.
Since you had both streams keyed, your current solution looks fine to me.

Best,
Fabian

Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <[hidden email]>:
Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle stream with the StateReportTrigger stream, would that means I need to make my Toggles broadcasted states? Or there's some way to modify the keyed states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous email). It seems working, but I am not confident in that, not sure whether it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main data stream, which might cause a high impact on performance (my toggle is on in only about 1% of the keys, and the rate of input1.left is less than a millionth comparing to the rate of input1.right)
/**
* This KeyedBroadcastProcessFunction has:
* input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
* input1.left: Toggles in the form of a tuple (Key, Boolean).
* When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
* If it is false, records from input1.right for that same key will be dropped
* input1.right: the main data stream
*
* input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
* the current value of Toggles will be sent out via the outputTag
*/
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {

val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

override def processElement(in1: Either[Toggle, MyEvent],
readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
collector: Collector[MyEvent]): Unit = {
in1 match {
case Left(toggle) =>
getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
case Right(event) =>
if (getRuntimeContext.getState(toggleStateDescriptor).value())
collector.collect(event)
}
}

override def processBroadcastElement(in2: Any,
context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
collector: Collector[MyEvent]): Unit = {
context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
if (s != null) context.output(outputTag, (k, s.value())))
}
}
Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <[hidden email]> wrote:
Hi,

Passing a Context through a DataStream definitely does not work.
You'd need to have the keyed state that you want to scan over in the KeyedBroadcastProcessFunction.

For the toggle filter use case, you would need to have a unioned stream with Toggle and StateReport events.
For the output, you can use side outputs to route the different outputs to different streams.

Best, Fabian

Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <[hidden email]>:
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to export all not-null keyed ValueState

Averell
Thank you very much, Fabian.

Regards,
Averell

On Fri, May 10, 2019 at 9:46 PM Fabian Hueske <[hidden email]> wrote:
Hi Averell,

I'd go with your approach any state access (given that you use RocksDB keyed state) or deduplication of messages is going to be more expensive than a simple cast.

Best, Fabian

Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <[hidden email]>:
Hi Fabian,

Thanks for that. However, as I mentioned in my previous email, that implementation requires a lot of typecasting/object wrapping.
I tried to broadcast that Toggle stream - the toggles will be saved as a MapState, and whenever an export trigger record arrived, I send out that MapState. There's no use of applyToKeyedState in this implementation. And the problem I got is I received duplicated output (one from each parallelism-instance).

Is there any option to modify the keyed state from within the processBroadcastElement() method?

Thanks a lot for your help.

Regards,
Averell


On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <[hidden email]> wrote:
Hi Averell,

Ah, sorry. I had assumed the toggle events where broadcasted anyway.
Since you had both streams keyed, your current solution looks fine to me.

Best,
Fabian

Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <[hidden email]>:
Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle stream with the StateReportTrigger stream, would that means I need to make my Toggles broadcasted states? Or there's some way to modify the keyed states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous email). It seems working, but I am not confident in that, not sure whether it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main data stream, which might cause a high impact on performance (my toggle is on in only about 1% of the keys, and the rate of input1.left is less than a millionth comparing to the rate of input1.right)
/**
* This KeyedBroadcastProcessFunction has:
* input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
* input1.left: Toggles in the form of a tuple (Key, Boolean).
* When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
* If it is false, records from input1.right for that same key will be dropped
* input1.right: the main data stream
*
* input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
* the current value of Toggles will be sent out via the outputTag
*/
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {

val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

override def processElement(in1: Either[Toggle, MyEvent],
readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
collector: Collector[MyEvent]): Unit = {
in1 match {
case Left(toggle) =>
getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
case Right(event) =>
if (getRuntimeContext.getState(toggleStateDescriptor).value())
collector.collect(event)
}
}

override def processBroadcastElement(in2: Any,
context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
collector: Collector[MyEvent]): Unit = {
context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
if (s != null) context.output(outputTag, (k, s.value())))
}
}
Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <[hidden email]> wrote:
Hi,

Passing a Context through a DataStream definitely does not work.
You'd need to have the keyed state that you want to scan over in the KeyedBroadcastProcessFunction.

For the toggle filter use case, you would need to have a unioned stream with Toggle and StateReport events.
For the output, you can use side outputs to route the different outputs to different streams.

Best, Fabian

Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <[hidden email]>:
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/