Accessing configuration in RichFunction

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Accessing configuration in RichFunction

mnxfst
Hi

While working on a RichFilterFunction implementation I was wondering, if there is a much better way to access configuration
options read from file during startup. Actually, I am using getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
to get access to my settings.

Reason for that is, that the Configuration parameter provided to the open function does not carry my settings. That is probably
the case as I use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my configuration into the environment
which in turn is not passed on as part of the open call - I found no other way to handle configuration ;-)

My question is: who is responsible for calling the open function, where does the configuration parameter has its origins aka where
is its content taken from and is it possible to define somewhere in the main program which configuration to pass into a specific operator?

Best
  Christian
Reply | Threaded
Open this post in threaded view
|

Re: Accessing configuration in RichFunction

Fabian Hueske-2
Hi Christian,

the open method is called by the Flink workers when the parallel tasks are initialized.
The configuration parameter is the configuration object of the operator.
You can set parameters in the operator config as follows:

DataSet<String> text = ...
DataSet<Tuple2<String, Integer> wc = text.flatMap(new Tokenizer()).getParameters().setString("myKey", "myVal");

Best, Fabian


2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[hidden email]>:
Hi

While working on a RichFilterFunction implementation I was wondering, if there is a much better way to access configuration
options read from file during startup. Actually, I am using getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
to get access to my settings.

Reason for that is, that the Configuration parameter provided to the open function does not carry my settings. That is probably
the case as I use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my configuration into the environment
which in turn is not passed on as part of the open call - I found no other way to handle configuration ;-)

My question is: who is responsible for calling the open function, where does the configuration parameter has its origins aka where
is its content taken from and is it possible to define somewhere in the main program which configuration to pass into a specific operator?

Best
  Christian

Reply | Threaded
Open this post in threaded view
|

Re: Accessing configuration in RichFunction

mnxfst
Hi Fabian,

thanks for your quick response. I just figured out that I forgot to mention a small but probably relevant detail: I am working with the streaming api.

Although there is a way to access the overall job settings, I need a solution to "reduce" the view on configuration options available on operator level.
For example, I would like to pass instance specific settings like an operator identifier but there might be different operators in the overall program. 

Best
  Christian

2016-01-13 10:52 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi Christian,

the open method is called by the Flink workers when the parallel tasks are initialized.
The configuration parameter is the configuration object of the operator.
You can set parameters in the operator config as follows:

DataSet<String> text = ...
DataSet<Tuple2<String, Integer> wc = text.flatMap(new Tokenizer()).getParameters().setString("myKey", "myVal");

Best, Fabian


2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[hidden email]>:
Hi

While working on a RichFilterFunction implementation I was wondering, if there is a much better way to access configuration
options read from file during startup. Actually, I am using getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
to get access to my settings.

Reason for that is, that the Configuration parameter provided to the open function does not carry my settings. That is probably
the case as I use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my configuration into the environment
which in turn is not passed on as part of the open call - I found no other way to handle configuration ;-)

My question is: who is responsible for calling the open function, where does the configuration parameter has its origins aka where
is its content taken from and is it possible to define somewhere in the main program which configuration to pass into a specific operator?

Best
  Christian


Reply | Threaded
Open this post in threaded view
|

Re: Accessing configuration in RichFunction

Maximilian Michels
Hi Christian,

For your implementation, would it suffice to pass a Configuration with
your RichFilterFunction? You said the global job parameters are not
passed on to your user function? Can you confirm this is a bug?

Cheers,
Max

On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
<[hidden email]> wrote:

> Hi Fabian,
>
> thanks for your quick response. I just figured out that I forgot to mention
> a small but probably relevant detail: I am working with the streaming api.
>
> Although there is a way to access the overall job settings, I need a
> solution to "reduce" the view on configuration options available on operator
> level.
> For example, I would like to pass instance specific settings like an
> operator identifier but there might be different operators in the overall
> program.
>
> Best
>   Christian
>
> 2016-01-13 10:52 GMT+01:00 Fabian Hueske <[hidden email]>:
>>
>> Hi Christian,
>>
>> the open method is called by the Flink workers when the parallel tasks are
>> initialized.
>> The configuration parameter is the configuration object of the operator.
>> You can set parameters in the operator config as follows:
>>
>> DataSet<String> text = ...
>> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>> Tokenizer()).getParameters().setString("myKey", "myVal");
>>
>> Best, Fabian
>>
>>
>> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[hidden email]>:
>>>
>>> Hi
>>>
>>> While working on a RichFilterFunction implementation I was wondering, if
>>> there is a much better way to access configuration
>>> options read from file during startup. Actually, I am using
>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>>> to get access to my settings.
>>>
>>> Reason for that is, that the Configuration parameter provided to the open
>>> function does not carry my settings. That is probably
>>> the case as I use
>>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my
>>> configuration into the environment
>>> which in turn is not passed on as part of the open call - I found no
>>> other way to handle configuration ;-)
>>>
>>> My question is: who is responsible for calling the open function, where
>>> does the configuration parameter has its origins aka where
>>> is its content taken from and is it possible to define somewhere in the
>>> main program which configuration to pass into a specific operator?
>>>
>>> Best
>>>   Christian
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Accessing configuration in RichFunction

mnxfst
Hi Max,

maybe I explained it a bit mistakable ;-)

I have a stream-based application which contains a RichFilterFunction implementation. The parent provides a lifecycle method open (open(Configuration)) which receives a Configuration object as input. I would like to use this call to pass options into the operator instance. 

Unfortunately, I found no hint where and how to provide the information such that I receive them at the described method. Actually, I am accessing the surrounding runtime context to retrieve the global job parameters where I extract the desired information from. But for some reasons I do not want the operator to receive its setup information from the provided Configuration instance ;-) 

That's why I am looking for the place where the configuration object is created and passed into the rich filter function. I would like to insert dedicated information for a dedicated filter instance.

Best
  Christian


2016-01-18 12:30 GMT+01:00 Maximilian Michels <[hidden email]>:
Hi Christian,

For your implementation, would it suffice to pass a Configuration with
your RichFilterFunction? You said the global job parameters are not
passed on to your user function? Can you confirm this is a bug?

Cheers,
Max

On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
<[hidden email]> wrote:
> Hi Fabian,
>
> thanks for your quick response. I just figured out that I forgot to mention
> a small but probably relevant detail: I am working with the streaming api.
>
> Although there is a way to access the overall job settings, I need a
> solution to "reduce" the view on configuration options available on operator
> level.
> For example, I would like to pass instance specific settings like an
> operator identifier but there might be different operators in the overall
> program.
>
> Best
>   Christian
>
> 2016-01-13 10:52 GMT+01:00 Fabian Hueske <[hidden email]>:
>>
>> Hi Christian,
>>
>> the open method is called by the Flink workers when the parallel tasks are
>> initialized.
>> The configuration parameter is the configuration object of the operator.
>> You can set parameters in the operator config as follows:
>>
>> DataSet<String> text = ...
>> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>> Tokenizer()).getParameters().setString("myKey", "myVal");
>>
>> Best, Fabian
>>
>>
>> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[hidden email]>:
>>>
>>> Hi
>>>
>>> While working on a RichFilterFunction implementation I was wondering, if
>>> there is a much better way to access configuration
>>> options read from file during startup. Actually, I am using
>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>>> to get access to my settings.
>>>
>>> Reason for that is, that the Configuration parameter provided to the open
>>> function does not carry my settings. That is probably
>>> the case as I use
>>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my
>>> configuration into the environment
>>> which in turn is not passed on as part of the open call - I found no
>>> other way to handle configuration ;-)
>>>
>>> My question is: who is responsible for calling the open function, where
>>> does the configuration parameter has its origins aka where
>>> is its content taken from and is it possible to define somewhere in the
>>> main program which configuration to pass into a specific operator?
>>>
>>> Best
>>>   Christian
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Accessing configuration in RichFunction

rmetzger0
Hi Christian,

I think the DataStream API does not allow you to pass any parameter to the open(Configuration) method.
That method is only used in the DataSet (Batch) API, and its use is discouraged.

A much better option to pass a Configuration into your function is as follows:


Configuration mapConf = new Configuration();
mapConf.setDouble("somthing", 1.2);

DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)

text.flatMap(new Tokenizer(mapConf))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1);

And in the Tokenizer:

public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
private final Configuration mapConf;

public Tokenizer(Configuration mapConf) {
this.mapConf = mapConf;
}

This works as long as the type you're passing is serializable.



On Mon, Jan 18, 2016 at 12:43 PM, Christian Kreutzfeldt <[hidden email]> wrote:
Hi Max,

maybe I explained it a bit mistakable ;-)

I have a stream-based application which contains a RichFilterFunction implementation. The parent provides a lifecycle method open (open(Configuration)) which receives a Configuration object as input. I would like to use this call to pass options into the operator instance. 

Unfortunately, I found no hint where and how to provide the information such that I receive them at the described method. Actually, I am accessing the surrounding runtime context to retrieve the global job parameters where I extract the desired information from. But for some reasons I do not want the operator to receive its setup information from the provided Configuration instance ;-) 

That's why I am looking for the place where the configuration object is created and passed into the rich filter function. I would like to insert dedicated information for a dedicated filter instance.

Best
  Christian


2016-01-18 12:30 GMT+01:00 Maximilian Michels <[hidden email]>:
Hi Christian,

For your implementation, would it suffice to pass a Configuration with
your RichFilterFunction? You said the global job parameters are not
passed on to your user function? Can you confirm this is a bug?

Cheers,
Max

On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
<[hidden email]> wrote:
> Hi Fabian,
>
> thanks for your quick response. I just figured out that I forgot to mention
> a small but probably relevant detail: I am working with the streaming api.
>
> Although there is a way to access the overall job settings, I need a
> solution to "reduce" the view on configuration options available on operator
> level.
> For example, I would like to pass instance specific settings like an
> operator identifier but there might be different operators in the overall
> program.
>
> Best
>   Christian
>
> 2016-01-13 10:52 GMT+01:00 Fabian Hueske <[hidden email]>:
>>
>> Hi Christian,
>>
>> the open method is called by the Flink workers when the parallel tasks are
>> initialized.
>> The configuration parameter is the configuration object of the operator.
>> You can set parameters in the operator config as follows:
>>
>> DataSet<String> text = ...
>> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>> Tokenizer()).getParameters().setString("myKey", "myVal");
>>
>> Best, Fabian
>>
>>
>> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[hidden email]>:
>>>
>>> Hi
>>>
>>> While working on a RichFilterFunction implementation I was wondering, if
>>> there is a much better way to access configuration
>>> options read from file during startup. Actually, I am using
>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>>> to get access to my settings.
>>>
>>> Reason for that is, that the Configuration parameter provided to the open
>>> function does not carry my settings. That is probably
>>> the case as I use
>>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my
>>> configuration into the environment
>>> which in turn is not passed on as part of the open call - I found no
>>> other way to handle configuration ;-)
>>>
>>> My question is: who is responsible for calling the open function, where
>>> does the configuration parameter has its origins aka where
>>> is its content taken from and is it possible to define somewhere in the
>>> main program which configuration to pass into a specific operator?
>>>
>>> Best
>>>   Christian
>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: Accessing configuration in RichFunction

mnxfst
Hi Robert,

using the constructor is actually the selected way. Using the existing lifecycle method was an idea to integrate it more with the existing framework design ;-) 

Best
  Christian

2016-01-18 13:38 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Christian,

I think the DataStream API does not allow you to pass any parameter to the open(Configuration) method.
That method is only used in the DataSet (Batch) API, and its use is discouraged.

A much better option to pass a Configuration into your function is as follows:


Configuration mapConf = new Configuration();
mapConf.setDouble("somthing", 1.2);

DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)

text.flatMap(new Tokenizer(mapConf))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1);

And in the Tokenizer:

public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
private final Configuration mapConf;

public Tokenizer(Configuration mapConf) {
this.mapConf = mapConf;
}

This works as long as the type you're passing is serializable.



On Mon, Jan 18, 2016 at 12:43 PM, Christian Kreutzfeldt <[hidden email]> wrote:
Hi Max,

maybe I explained it a bit mistakable ;-)

I have a stream-based application which contains a RichFilterFunction implementation. The parent provides a lifecycle method open (open(Configuration)) which receives a Configuration object as input. I would like to use this call to pass options into the operator instance. 

Unfortunately, I found no hint where and how to provide the information such that I receive them at the described method. Actually, I am accessing the surrounding runtime context to retrieve the global job parameters where I extract the desired information from. But for some reasons I do not want the operator to receive its setup information from the provided Configuration instance ;-) 

That's why I am looking for the place where the configuration object is created and passed into the rich filter function. I would like to insert dedicated information for a dedicated filter instance.

Best
  Christian


2016-01-18 12:30 GMT+01:00 Maximilian Michels <[hidden email]>:
Hi Christian,

For your implementation, would it suffice to pass a Configuration with
your RichFilterFunction? You said the global job parameters are not
passed on to your user function? Can you confirm this is a bug?

Cheers,
Max

On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
<[hidden email]> wrote:
> Hi Fabian,
>
> thanks for your quick response. I just figured out that I forgot to mention
> a small but probably relevant detail: I am working with the streaming api.
>
> Although there is a way to access the overall job settings, I need a
> solution to "reduce" the view on configuration options available on operator
> level.
> For example, I would like to pass instance specific settings like an
> operator identifier but there might be different operators in the overall
> program.
>
> Best
>   Christian
>
> 2016-01-13 10:52 GMT+01:00 Fabian Hueske <[hidden email]>:
>>
>> Hi Christian,
>>
>> the open method is called by the Flink workers when the parallel tasks are
>> initialized.
>> The configuration parameter is the configuration object of the operator.
>> You can set parameters in the operator config as follows:
>>
>> DataSet<String> text = ...
>> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>> Tokenizer()).getParameters().setString("myKey", "myVal");
>>
>> Best, Fabian
>>
>>
>> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[hidden email]>:
>>>
>>> Hi
>>>
>>> While working on a RichFilterFunction implementation I was wondering, if
>>> there is a much better way to access configuration
>>> options read from file during startup. Actually, I am using
>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>>> to get access to my settings.
>>>
>>> Reason for that is, that the Configuration parameter provided to the open
>>> function does not carry my settings. That is probably
>>> the case as I use
>>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my
>>> configuration into the environment
>>> which in turn is not passed on as part of the open call - I found no
>>> other way to handle configuration ;-)
>>>
>>> My question is: who is responsible for calling the open function, where
>>> does the configuration parameter has its origins aka where
>>> is its content taken from and is it possible to define somewhere in the
>>> main program which configuration to pass into a specific operator?
>>>
>>> Best
>>>   Christian
>>
>>
>