[RichFlattMapfunction] Configuration File

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[RichFlattMapfunction] Configuration File

simon peyer
Hi folks

I'm extending a RichFlatMapFunction in order to use states on a keyed stream.
Concerning this i have two questions:

1. I have a  var state_item: ValueState[Option[String]] as a local variable in this class. Initialized with state_item = getRuntimeContext.getState(new ValueStateDescriptor..... in the open function.
Is the field state_item for every key different?

In other words if I have a key with val1 and val2 will these get two different states?


2. The open function contains a  override def open(conf: Configuration) configuration.
Is there a way to input a custom configuration in there?

Thanks Simon
Reply | Threaded
Open this post in threaded view
|

Re: [RichFlattMapfunction] Configuration File

Aljoscha Krettek
Hi Simon,
regarding 1. yes, the value that you get from state_item.value() and that you set using state_item.update() is scoped to the key of the incoming element. 

regarding 2. the open(conf: Configuration) signature is legacy from how Functions used to work quite a while back. In the streaming API this Configuration is always empty. If you want to configure your user function you can have the values as fields in your class and pass them in the constructor.

Cheers,
Aljoscha

On Fri, 20 May 2016 at 17:49 simon peyer <[hidden email]> wrote:
Hi folks

I'm extending a RichFlatMapFunction in order to use states on a keyed stream.
Concerning this i have two questions:

1. I have a  var state_item: ValueState[Option[String]] as a local variable in this class. Initialized with state_item = getRuntimeContext.getState(new ValueStateDescriptor..... in the open function.
Is the field state_item for every key different?

In other words if I have a key with val1 and val2 will these get two different states?


2. The open function contains a  override def open(conf: Configuration) configuration.
Is there a way to input a custom configuration in there?

Thanks Simon
Reply | Threaded
Open this post in threaded view
|

Re: [RichFlattMapfunction] Configuration File

simon peyer
Hi Aljoscha

Thanks for your reply.

Regarding question 2, the web dashboard does provide a properties section, besides (

)


Whats the most common way to handle properties in flink?
Is there a general way to go and any kind of integration in flink?

--Simon


On 21 May 2016, at 10:44, Aljoscha Krettek <[hidden email]> wrote:

Hi Simon,
regarding 1. yes, the value that you get from state_item.value() and that you set using state_item.update() is scoped to the key of the incoming element. 

regarding 2. the open(conf: Configuration) signature is legacy from how Functions used to work quite a while back. In the streaming API this Configuration is always empty. If you want to configure your user function you can have the values as fields in your class and pass them in the constructor.

Cheers,
Aljoscha

On Fri, 20 May 2016 at 17:49 simon peyer <[hidden email]> wrote:
Hi folks

I'm extending a RichFlatMapFunction in order to use states on a keyed stream.
Concerning this i have two questions:

1. I have a  var state_item: ValueState[Option[String]] as a local variable in this class. Initialized with state_item = getRuntimeContext.getState(new ValueStateDescriptor..... in the open function.
Is the field state_item for every key different?

In other words if I have a key with val1 and val2 will these get two different states?


2. The open function contains a  override def open(conf: Configuration) configuration.
Is there a way to input a custom configuration in there?

Thanks Simon

Reply | Threaded
Open this post in threaded view
|

Re: [RichFlattMapfunction] Configuration File

Maximilian Michels
Hi Simon,

As Aljoscha said, the best way is to supply the configuration as class
fields. Alternatively, if you overload the open(..) method, it should
also show up in the Properties/Configuration tab on the Web interface.

Cheers,
Max

On Mon, May 23, 2016 at 11:43 AM, simon peyer <[hidden email]> wrote:

> Hi Aljoscha
>
> Thanks for your reply.
>
> Regarding question 2, the web dashboard does provide a properties section,
> besides (
>
> Plan
> Timeline
> Exceptions
> Properties
> Configuration
>
>
> )
>
>
> Whats the most common way to handle properties in flink?
> Is there a general way to go and any kind of integration in flink?
>
> --Simon
>
>
> On 21 May 2016, at 10:44, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Simon,
> regarding 1. yes, the value that you get from state_item.value() and that
> you set using state_item.update() is scoped to the key of the incoming
> element.
>
> regarding 2. the open(conf: Configuration) signature is legacy from how
> Functions used to work quite a while back. In the streaming API this
> Configuration is always empty. If you want to configure your user function
> you can have the values as fields in your class and pass them in the
> constructor.
>
> Cheers,
> Aljoscha
>
> On Fri, 20 May 2016 at 17:49 simon peyer <[hidden email]> wrote:
>>
>> Hi folks
>>
>> I'm extending a RichFlatMapFunction in order to use states on a keyed
>> stream.
>> Concerning this i have two questions:
>>
>> 1. I have a  var state_item: ValueState[Option[String]] as a local
>> variable in this class. Initialized with state_item =
>> getRuntimeContext.getState(new ValueStateDescriptor..... in the open
>> function.
>> Is the field state_item for every key different?
>>
>> In other words if I have a key with val1 and val2 will these get two
>> different states?
>>
>>
>> 2. The open function contains a  override def open(conf: Configuration)
>> configuration.
>> Is there a way to input a custom configuration in there?
>>
>> Thanks Simon
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [RichFlattMapfunction] Configuration File

simon peyer
Hi Max

Thanks a lot.
I found now this solution:

Passing it as a Configuration object to single functions

The example below shows how to pass the parameters as a Configuration object to a user defined function.

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())

In the Tokenizer, the object is now accessible in the open(Configuration conf) method:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
	@Override
	public void open(Configuration parameters) throws Exception {
		parameters.getInteger("myInt", -1);
		// .. do


Cheers
Simon

On 23 May 2016, at 14:01, Maximilian Michels <[hidden email]> wrote:

Hi Simon,

As Aljoscha said, the best way is to supply the configuration as class
fields. Alternatively, if you overload the open(..) method, it should
also show up in the Properties/Configuration tab on the Web interface.

Cheers,
Max

On Mon, May 23, 2016 at 11:43 AM, simon peyer <[hidden email]> wrote:
Hi Aljoscha

Thanks for your reply.

Regarding question 2, the web dashboard does provide a properties section,
besides (

Plan
Timeline
Exceptions
Properties
Configuration


)


Whats the most common way to handle properties in flink?
Is there a general way to go and any kind of integration in flink?

--Simon


On 21 May 2016, at 10:44, Aljoscha Krettek <[hidden email]> wrote:

Hi Simon,
regarding 1. yes, the value that you get from state_item.value() and that
you set using state_item.update() is scoped to the key of the incoming
element.

regarding 2. the open(conf: Configuration) signature is legacy from how
Functions used to work quite a while back. In the streaming API this
Configuration is always empty. If you want to configure your user function
you can have the values as fields in your class and pass them in the
constructor.

Cheers,
Aljoscha

On Fri, 20 May 2016 at 17:49 simon peyer <[hidden email]> wrote:

Hi folks

I'm extending a RichFlatMapFunction in order to use states on a keyed
stream.
Concerning this i have two questions:

1. I have a  var state_item: ValueState[Option[String]] as a local
variable in this class. Initialized with state_item =
getRuntimeContext.getState(new ValueStateDescriptor..... in the open
function.
Is the field state_item for every key different?

In other words if I have a key with val1 and val2 will these get two
different states?


2. The open function contains a  override def open(conf: Configuration)
configuration.
Is there a way to input a custom configuration in there?

Thanks Simon



Reply | Threaded
Open this post in threaded view
|

Re: [RichFlattMapfunction] Configuration File

Maximilian Michels
Hi Simon,

Great! I think this is only available in the DataSet API.

Cheers,
Max

On Mon, May 23, 2016 at 2:23 PM, simon peyer <[hidden email]> wrote:

> Hi Max
>
> Thanks a lot.
> I found now this solution:
>
> Passing it as a Configuration object to single functions
>
> The example below shows how to pass the parameters as a Configuration object
> to a user defined function.
>
> ParameterTool parameters = ParameterTool.fromArgs(args);
> DataSet<Tuple2<String, Integer>> counts = text.flatMap(new
> Tokenizer()).withParameters(parameters.getConfiguration())
>
> In the Tokenizer, the object is now accessible in the open(Configuration
> conf) method:
>
> public static final class Tokenizer extends RichFlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void open(Configuration parameters) throws Exception {
> parameters.getInteger("myInt", -1);
> // .. do
>
>
>
> Cheers
> Simon
>
> On 23 May 2016, at 14:01, Maximilian Michels <[hidden email]> wrote:
>
> Hi Simon,
>
> As Aljoscha said, the best way is to supply the configuration as class
> fields. Alternatively, if you overload the open(..) method, it should
> also show up in the Properties/Configuration tab on the Web interface.
>
> Cheers,
> Max
>
> On Mon, May 23, 2016 at 11:43 AM, simon peyer <[hidden email]>
> wrote:
>
> Hi Aljoscha
>
> Thanks for your reply.
>
> Regarding question 2, the web dashboard does provide a properties section,
> besides (
>
> Plan
> Timeline
> Exceptions
> Properties
> Configuration
>
>
> )
>
>
> Whats the most common way to handle properties in flink?
> Is there a general way to go and any kind of integration in flink?
>
> --Simon
>
>
> On 21 May 2016, at 10:44, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Simon,
> regarding 1. yes, the value that you get from state_item.value() and that
> you set using state_item.update() is scoped to the key of the incoming
> element.
>
> regarding 2. the open(conf: Configuration) signature is legacy from how
> Functions used to work quite a while back. In the streaming API this
> Configuration is always empty. If you want to configure your user function
> you can have the values as fields in your class and pass them in the
> constructor.
>
> Cheers,
> Aljoscha
>
> On Fri, 20 May 2016 at 17:49 simon peyer <[hidden email]> wrote:
>
>
> Hi folks
>
> I'm extending a RichFlatMapFunction in order to use states on a keyed
> stream.
> Concerning this i have two questions:
>
> 1. I have a  var state_item: ValueState[Option[String]] as a local
> variable in this class. Initialized with state_item =
> getRuntimeContext.getState(new ValueStateDescriptor..... in the open
> function.
> Is the field state_item for every key different?
>
> In other words if I have a key with val1 and val2 will these get two
> different states?
>
>
> 2. The open function contains a  override def open(conf: Configuration)
> configuration.
> Is there a way to input a custom configuration in there?
>
> Thanks Simon
>
>
>
>