RichMapFunction in DataStream, how do I set the parameters received in open?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

RichMapFunction in DataStream, how do I set the parameters received in open?

Luis Mariano Guerra
hi! I'm trying to pass Configuration parameters to a RichMapFunction in flink streaming and I can't find the way to do it

I need to pass two strings to the MapFunction and I was getting a serialization error, so I tried RichMapFunction and open() but I can't find a way to set the the parameters I get at open(Configuration params)

withParameters seems to be only defined for batch from what I've seen.

how do I make a serializable MapFunction that receives two strings as configuration from outside its definition?
Reply | Threaded
Open this post in threaded view
|

Re: RichMapFunction in DataStream, how do I set the parameters received in open?

Chesnay Schepler
Hello,

you cannot pass a configuration in the Steaming API. This way of configuration is more of a relic from past times.

The common way to pass configure a function is to pass the parameters through the constructor and store the values in a field.

Regards,
Chesnay

On 12.09.2016 18:27, Luis Mariano Guerra wrote:
hi! I'm trying to pass Configuration parameters to a RichMapFunction in flink streaming and I can't find the way to do it

I need to pass two strings to the MapFunction and I was getting a serialization error, so I tried RichMapFunction and open() but I can't find a way to set the the parameters I get at open(Configuration params)

withParameters seems to be only defined for batch from what I've seen.

how do I make a serializable MapFunction that receives two strings as configuration from outside its definition?

Reply | Threaded
Open this post in threaded view
|

Re: RichMapFunction in DataStream, how do I set the parameters received in open?

Luis Mariano Guerra
On Mon, Sep 12, 2016 at 8:35 PM, Chesnay Schepler <[hidden email]> wrote:
Hello,

you cannot pass a configuration in the Steaming API. This way of configuration is more of a relic from past times.

The common way to pass configure a function is to pass the parameters through the constructor and store the values in a field.

thanks!

I tried that before but I was having problems with a field of the class not being serializable.

for anyone that may have this problem, mark the field as transient and initialize it on first use :)

            private transient Field fieldAccessor;

            ...

            @Override
            public Tuple2<String, Integer> map(Object obj) throws Exception {
                if (fieldAccessor == null) {
                    Class<?> cls = Class.forName(className);
                    fieldAccessor = cls.getDeclaredField(fieldName);
                }

                ...
            }
 

Regards,
Chesnay


On 12.09.2016 18:27, Luis Mariano Guerra wrote:
hi! I'm trying to pass Configuration parameters to a RichMapFunction in flink streaming and I can't find the way to do it

I need to pass two strings to the MapFunction and I was getting a serialization error, so I tried RichMapFunction and open() but I can't find a way to set the the parameters I get at open(Configuration params)

withParameters seems to be only defined for batch from what I've seen.

how do I make a serializable MapFunction that receives two strings as configuration from outside its definition?