Hello, I have 2 questions regarding the RichMapFunction, starting from its use in https://github.com/apache/incubator-flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java . Q1: if we run this operator with 4 nodes, does its open(Configuration) method execute once on each node ? Q2: can we use fields (such as a counter) in the RichMapFunction class so that they are not shared between computation nodes, but are shared for all tuples that pass this transformation on a given computation node? Something like: dataset1.map( new RichMapFunction<T1,T2>(){ private int k; public void open(Configuration config){ ........... k = 0; } public T2 map(T1 tupin){ k++; return new T2(....,k) } } ); where T1,T2 stand for some tuple types or classes . Thank you in advance! Best regards, Camelia |
Hi Camelia, in general, it is assumed that the user-defined operations do not have side effects. When sharing a counter between invocations of the user-defined function (flatMap() in your case) this would happen. Since, the system does not give any guarantees which data is processed on which node (within the semantics of the operator of course), doing something as you did, will not give deterministic results. To answer your questions: 1) open is called exactly once for each parallel operator instance. There might be more than one operators instances on each node (depending on the number of configured slots). All instance on the same node will run within the same JVM, so be careful with singletons or other shared objects. 2) each parallel operator instance does have its own member variables, i.e., k will not be shared among other operator instances. However, this operator does not return deterministic results as pointed out. If you want to map over all elements of a partition, mapPartition might be a better fit than flatMap. Best, Fabian 2014-11-04 16:16 GMT+01:00 Camelia-Elena Ciolac <[hidden email]>:
|
First of all, thank you for the detailed explanation. I tried to use the MapPartitionFunction instead, as it matches better the case, but now I can't use any more the open(Configuration) method as I could with the RichMapFunction. I need a workaround to read the broadcast variable, to obtain the same result as: getRuntimeContext().getBroadcastVariable("parameters"); Is it somehow possible to access the broadcast variable from inside the mapPartition? Thank you! Camelia De: "Fabian Hueske" <[hidden email]> |
There should also be a rich function stub for mapPartition.
Another option to initialize is the default constructor if the initialization does not depend on the Configuration object.
Am Dienstag, 4. November 2014 schrieb Camelia-Elena Ciolac :
|
Free forum by Nabble | Edit this page |