Hi folks
I'm extending a RichFlatMapFunction in order to use states on a keyed stream. Concerning this i have two questions: 1. I have a var state_item: ValueState[Option[String]] as a local variable in this class. Initialized with state_item = getRuntimeContext.getState(new ValueStateDescriptor..... in the open function. Is the field state_item for every key different? In other words if I have a key with val1 and val2 will these get two different states? 2. The open function contains a override def open(conf: Configuration) configuration. Is there a way to input a custom configuration in there? Thanks Simon
|
Hi Simon, regarding 1. yes, the value that you get from state_item.value() and that you set using state_item.update() is scoped to the key of the incoming element. regarding 2. the open(conf: Configuration) signature is legacy from how Functions used to work quite a while back. In the streaming API this Configuration is always empty. If you want to configure your user function you can have the values as fields in your class and pass them in the constructor. Cheers, Aljoscha On Fri, 20 May 2016 at 17:49 simon peyer <[hidden email]> wrote:
|
Hi Aljoscha Thanks for your reply. Regarding question 2, the web dashboard does provide a properties section, besides ( ) Whats the most common way to handle properties in flink? Is there a general way to go and any kind of integration in flink? --Simon
|
Hi Simon,
As Aljoscha said, the best way is to supply the configuration as class fields. Alternatively, if you overload the open(..) method, it should also show up in the Properties/Configuration tab on the Web interface. Cheers, Max On Mon, May 23, 2016 at 11:43 AM, simon peyer <[hidden email]> wrote: > Hi Aljoscha > > Thanks for your reply. > > Regarding question 2, the web dashboard does provide a properties section, > besides ( > > Plan > Timeline > Exceptions > Properties > Configuration > > > ) > > > Whats the most common way to handle properties in flink? > Is there a general way to go and any kind of integration in flink? > > --Simon > > > On 21 May 2016, at 10:44, Aljoscha Krettek <[hidden email]> wrote: > > Hi Simon, > regarding 1. yes, the value that you get from state_item.value() and that > you set using state_item.update() is scoped to the key of the incoming > element. > > regarding 2. the open(conf: Configuration) signature is legacy from how > Functions used to work quite a while back. In the streaming API this > Configuration is always empty. If you want to configure your user function > you can have the values as fields in your class and pass them in the > constructor. > > Cheers, > Aljoscha > > On Fri, 20 May 2016 at 17:49 simon peyer <[hidden email]> wrote: >> >> Hi folks >> >> I'm extending a RichFlatMapFunction in order to use states on a keyed >> stream. >> Concerning this i have two questions: >> >> 1. I have a var state_item: ValueState[Option[String]] as a local >> variable in this class. Initialized with state_item = >> getRuntimeContext.getState(new ValueStateDescriptor..... in the open >> function. >> Is the field state_item for every key different? >> >> In other words if I have a key with val1 and val2 will these get two >> different states? >> >> >> 2. The open function contains a override def open(conf: Configuration) >> configuration. >> Is there a way to input a custom configuration in there? >> >> Thanks Simon > > |
Hi Max Thanks a lot. I found now this solution: Passing it as a |
Hi Simon,
Great! I think this is only available in the DataSet API. Cheers, Max On Mon, May 23, 2016 at 2:23 PM, simon peyer <[hidden email]> wrote: > Hi Max > > Thanks a lot. > I found now this solution: > > Passing it as a Configuration object to single functions > > The example below shows how to pass the parameters as a Configuration object > to a user defined function. > > ParameterTool parameters = ParameterTool.fromArgs(args); > DataSet<Tuple2<String, Integer>> counts = text.flatMap(new > Tokenizer()).withParameters(parameters.getConfiguration()) > > In the Tokenizer, the object is now accessible in the open(Configuration > conf) method: > > public static final class Tokenizer extends RichFlatMapFunction<String, > Tuple2<String, Integer>> { > @Override > public void open(Configuration parameters) throws Exception { > parameters.getInteger("myInt", -1); > // .. do > > > > Cheers > Simon > > On 23 May 2016, at 14:01, Maximilian Michels <[hidden email]> wrote: > > Hi Simon, > > As Aljoscha said, the best way is to supply the configuration as class > fields. Alternatively, if you overload the open(..) method, it should > also show up in the Properties/Configuration tab on the Web interface. > > Cheers, > Max > > On Mon, May 23, 2016 at 11:43 AM, simon peyer <[hidden email]> > wrote: > > Hi Aljoscha > > Thanks for your reply. > > Regarding question 2, the web dashboard does provide a properties section, > besides ( > > Plan > Timeline > Exceptions > Properties > Configuration > > > ) > > > Whats the most common way to handle properties in flink? > Is there a general way to go and any kind of integration in flink? > > --Simon > > > On 21 May 2016, at 10:44, Aljoscha Krettek <[hidden email]> wrote: > > Hi Simon, > regarding 1. yes, the value that you get from state_item.value() and that > you set using state_item.update() is scoped to the key of the incoming > element. > > regarding 2. the open(conf: Configuration) signature is legacy from how > Functions used to work quite a while back. In the streaming API this > Configuration is always empty. If you want to configure your user function > you can have the values as fields in your class and pass them in the > constructor. > > Cheers, > Aljoscha > > On Fri, 20 May 2016 at 17:49 simon peyer <[hidden email]> wrote: > > > Hi folks > > I'm extending a RichFlatMapFunction in order to use states on a keyed > stream. > Concerning this i have two questions: > > 1. I have a var state_item: ValueState[Option[String]] as a local > variable in this class. Initialized with state_item = > getRuntimeContext.getState(new ValueStateDescriptor..... in the open > function. > Is the field state_item for every key different? > > In other words if I have a key with val1 and val2 will these get two > different states? > > > 2. The open function contains a override def open(conf: Configuration) > configuration. > Is there a way to input a custom configuration in there? > > Thanks Simon > > > > |
Free forum by Nabble | Edit this page |