RichMapFunction related question

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

RichMapFunction related question

Camelia-Elena Ciolac
Hello,


Q1: if we run this operator with 4 nodes, does its    open(Configuration)  method execute once on each node ?    

Q2: can we use fields (such as a counter) in the RichMapFunction class so that they are not shared between computation nodes, but are shared for all tuples that pass this transformation on a given computation node?
Something like:

dataset1.map(
        new RichMapFunction<T1,T2>(){
               private int k;

               public void open(Configuration config){
                      ...........
                      k = 0;
               }

              public T2 map(T1 tupin){
                       k++;
                      return new T2(....,k)
               }

         }
);

where T1,T2 stand for some tuple types or classes .

Thank you in advance!

Best regards,
Camelia


Reply | Threaded
Open this post in threaded view
|

Re: RichMapFunction related question

Fabian Hueske
Hi Camelia,

in general, it is assumed that the user-defined operations do not have side effects.
When sharing a counter between invocations of the user-defined function (flatMap() in your case) this would happen.
Since, the system does not give any guarantees which data is processed on which node (within the semantics of the operator of course), doing something as you did, will not give deterministic results.

To answer your questions:
1) open is called exactly once for each parallel operator instance. There might be more than one operators instances on each node (depending on the number of configured slots). All instance on the same node will run within the same JVM, so be careful with singletons or other shared objects.
2) each parallel operator instance does have its own member variables, i.e., k will not be shared among other operator instances. However, this operator does not return deterministic results as pointed out.

If you want to map over all elements of a partition, mapPartition might be a better fit than flatMap.

Best, Fabian

2014-11-04 16:16 GMT+01:00 Camelia-Elena Ciolac <[hidden email]>:
Hello,


Q1: if we run this operator with 4 nodes, does its    open(Configuration)  method execute once on each node ?    

Q2: can we use fields (such as a counter) in the RichMapFunction class so that they are not shared between computation nodes, but are shared for all tuples that pass this transformation on a given computation node?
Something like:

dataset1.map(
        new RichMapFunction<T1,T2>(){
               private int k;

               public void open(Configuration config){
                      ...........
                      k = 0;
               }

              public T2 map(T1 tupin){
                       k++;
                      return new T2(....,k)
               }

         }
);

where T1,T2 stand for some tuple types or classes .

Thank you in advance!

Best regards,
Camelia



Reply | Threaded
Open this post in threaded view
|

Re: RichMapFunction related question

Camelia-Elena Ciolac
First of all, thank you for the detailed explanation.

I tried to use the MapPartitionFunction instead, as it matches better the case, but now I can't use any more the    open(Configuration)    method as I could with the RichMapFunction.
I need a workaround to read the broadcast variable, to obtain the same result as:
   getRuntimeContext().getBroadcastVariable("parameters");

Is it somehow possible to access the broadcast variable from inside the mapPartition?

Thank you!
Camelia



De: "Fabian Hueske" <[hidden email]>
À: [hidden email]
Envoyé: Mardi 4 Novembre 2014 16:50:37
Objet: Re: RichMapFunction related question

Hi Camelia,

in general, it is assumed that the user-defined operations do not have side effects.
When sharing a counter between invocations of the user-defined function (flatMap() in your case) this would happen.
Since, the system does not give any guarantees which data is processed on which node (within the semantics of the operator of course), doing something as you did, will not give deterministic results.

To answer your questions:
1) open is called exactly once for each parallel operator instance. There might be more than one operators instances on each node (depending on the number of configured slots). All instance on the same node will run within the same JVM, so be careful with singletons or other shared objects.
2) each parallel operator instance does have its own member variables, i.e., k will not be shared among other operator instances. However, this operator does not return deterministic results as pointed out.

If you want to map over all elements of a partition, mapPartition might be a better fit than flatMap.

Best, Fabian

2014-11-04 16:16 GMT+01:00 Camelia-Elena Ciolac <[hidden email]>:
Hello,


Q1: if we run this operator with 4 nodes, does its    open(Configuration)  method execute once on each node ?    

Q2: can we use fields (such as a counter) in the RichMapFunction class so that they are not shared between computation nodes, but are shared for all tuples that pass this transformation on a given computation node?
Something like:

dataset1.map(
        new RichMapFunction<T1,T2>(){
               private int k;

               public void open(Configuration config){
                      ...........
                      k = 0;
               }

              public T2 map(T1 tupin){
                       k++;
                      return new T2(....,k)
               }

         }
);

where T1,T2 stand for some tuple types or classes .

Thank you in advance!

Best regards,
Camelia




Reply | Threaded
Open this post in threaded view
|

Re: RichMapFunction related question

Fabian Hueske
There should also be a rich function stub for mapPartition. 
Another option to initialize is the default constructor if the initialization does not depend on the Configuration object.

Am Dienstag, 4. November 2014 schrieb Camelia-Elena Ciolac :
First of all, thank you for the detailed explanation.

I tried to use the MapPartitionFunction instead, as it matches better the case, but now I can't use any more the    open(Configuration)    method as I could with the RichMapFunction.
I need a workaround to read the broadcast variable, to obtain the same result as:
   getRuntimeContext().getBroadcastVariable("parameters");

Is it somehow possible to access the broadcast variable from inside the mapPartition?

Thank you!
Camelia



De: "Fabian Hueske" <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;fhueske@apache.org&#39;);" target="_blank">fhueske@...>
À: <a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;user@flink.incubator.apache.org&#39;);" target="_blank">user@...
Envoyé: Mardi 4 Novembre 2014 16:50:37
Objet: Re: RichMapFunction related question

Hi Camelia,

in general, it is assumed that the user-defined operations do not have side effects.
When sharing a counter between invocations of the user-defined function (flatMap() in your case) this would happen.
Since, the system does not give any guarantees which data is processed on which node (within the semantics of the operator of course), doing something as you did, will not give deterministic results.

To answer your questions:
1) open is called exactly once for each parallel operator instance. There might be more than one operators instances on each node (depending on the number of configured slots). All instance on the same node will run within the same JVM, so be careful with singletons or other shared objects.
2) each parallel operator instance does have its own member variables, i.e., k will not be shared among other operator instances. However, this operator does not return deterministic results as pointed out.

If you want to map over all elements of a partition, mapPartition might be a better fit than flatMap.

Best, Fabian

2014-11-04 16:16 GMT+01:00 Camelia-Elena Ciolac <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;camelia-elena.ciolac@inria.fr&#39;);" target="_blank">camelia-elena.ciolac@...>:
Hello,


Q1: if we run this operator with 4 nodes, does its    open(Configuration)  method execute once on each node ?    

Q2: can we use fields (such as a counter) in the RichMapFunction class so that they are not shared between computation nodes, but are shared for all tuples that pass this transformation on a given computation node?
Something like:

dataset1.map(
        new RichMapFunction<T1,T2>(){
               private int k;

               public void open(Configuration config){
                      ...........
                      k = 0;
               }

              public T2 map(T1 tupin){
                       k++;
                      return new T2(....,k)
               }

         }
);

where T1,T2 stand for some tuple types or classes .

Thank you in advance!

Best regards,
Camelia