Hi
While working on a RichFilterFunction implementation I was wondering, if there is a much better way to access configuration options read from file during startup. Actually, I am using getRuntimeContext().getExecutionConfig().getGlobalJobParameters() to get access to my settings. Reason for that is, that the Configuration parameter provided to the open function does not carry my settings. That is probably the case as I use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my configuration into the environment which in turn is not passed on as part of the open call - I found no other way to handle configuration ;-) My question is: who is responsible for calling the open function, where does the configuration parameter has its origins aka where is its content taken from and is it possible to define somewhere in the main program which configuration to pass into a specific operator? Best Christian |
Hi Christian, the open method is called by the Flink workers when the parallel tasks are initialized.You can set parameters in the operator config as follows: DataSet<String> text = ...DataSet<Tuple2<String, Integer> wc = text.flatMap(new Tokenizer()).getParameters().setString("myKey", "myVal"); 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[hidden email]>:
|
Hi Fabian, thanks for your quick response. I just figured out that I forgot to mention a small but probably relevant detail: I am working with the streaming api. Although there is a way to access the overall job settings, I need a solution to "reduce" the view on configuration options available on operator level. For example, I would like to pass instance specific settings like an operator identifier but there might be different operators in the overall program. Best Christian 2016-01-13 10:52 GMT+01:00 Fabian Hueske <[hidden email]>:
|
Hi Christian,
For your implementation, would it suffice to pass a Configuration with your RichFilterFunction? You said the global job parameters are not passed on to your user function? Can you confirm this is a bug? Cheers, Max On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt <[hidden email]> wrote: > Hi Fabian, > > thanks for your quick response. I just figured out that I forgot to mention > a small but probably relevant detail: I am working with the streaming api. > > Although there is a way to access the overall job settings, I need a > solution to "reduce" the view on configuration options available on operator > level. > For example, I would like to pass instance specific settings like an > operator identifier but there might be different operators in the overall > program. > > Best > Christian > > 2016-01-13 10:52 GMT+01:00 Fabian Hueske <[hidden email]>: >> >> Hi Christian, >> >> the open method is called by the Flink workers when the parallel tasks are >> initialized. >> The configuration parameter is the configuration object of the operator. >> You can set parameters in the operator config as follows: >> >> DataSet<String> text = ... >> DataSet<Tuple2<String, Integer> wc = text.flatMap(new >> Tokenizer()).getParameters().setString("myKey", "myVal"); >> >> Best, Fabian >> >> >> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[hidden email]>: >>> >>> Hi >>> >>> While working on a RichFilterFunction implementation I was wondering, if >>> there is a much better way to access configuration >>> options read from file during startup. Actually, I am using >>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters() >>> to get access to my settings. >>> >>> Reason for that is, that the Configuration parameter provided to the open >>> function does not carry my settings. That is probably >>> the case as I use >>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my >>> configuration into the environment >>> which in turn is not passed on as part of the open call - I found no >>> other way to handle configuration ;-) >>> >>> My question is: who is responsible for calling the open function, where >>> does the configuration parameter has its origins aka where >>> is its content taken from and is it possible to define somewhere in the >>> main program which configuration to pass into a specific operator? >>> >>> Best >>> Christian >> >> > |
Hi Max, maybe I explained it a bit mistakable ;-) I have a stream-based application which contains a RichFilterFunction implementation. The parent provides a lifecycle method open (open(Configuration)) which receives a Configuration object as input. I would like to use this call to pass options into the operator instance. Unfortunately, I found no hint where and how to provide the information such that I receive them at the described method. Actually, I am accessing the surrounding runtime context to retrieve the global job parameters where I extract the desired information from. But for some reasons I do not want the operator to receive its setup information from the provided Configuration instance ;-) That's why I am looking for the place where the configuration object is created and passed into the rich filter function. I would like to insert dedicated information for a dedicated filter instance. Best Christian 2016-01-18 12:30 GMT+01:00 Maximilian Michels <[hidden email]>: Hi Christian, |
Hi Christian, I think the DataStream API does not allow you to pass any parameter to the open(Configuration) method. That method is only used in the DataSet (Batch) API, and its use is discouraged. A much better option to pass a Configuration into your function is as follows: Configuration mapConf = new Configuration(); And in the Tokenizer: public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { This works as long as the type you're passing is serializable. On Mon, Jan 18, 2016 at 12:43 PM, Christian Kreutzfeldt <[hidden email]> wrote:
|
Hi Robert, using the constructor is actually the selected way. Using the existing lifecycle method was an idea to integrate it more with the existing framework design ;-) Best Christian 2016-01-18 13:38 GMT+01:00 Robert Metzger <[hidden email]>:
|
Free forum by Nabble | Edit this page |