Hello,
I have an DataSet<UserDefinedType> which is roughly a record in a DataSet Or a file. Now I am using map transformation on this DataSet to compute a variable (coefficients of linear regression parameters and data structure used is a double[]). Now the issue is that, per record the variable will get updated and I am struggling to maintain state of this variable for the next record. In simple, for first record the variable values will be 0.0, and after first record the variable will get updated and I have to pass this updated variable for the second record and so on for all records in DataSet. Any suggestions on how to maintain state of a variable? Regards, Ravikumar |
Hi Ravikumar, Flink's operators are stateful. So you can simply create a variable in your mapper to keep the state around. But every mapper instance will have it's own state. This state is determined by the records which are sent to this mapper instance. If you need a global state, then you have to set the parallelism to 1. Cheers, Till On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <[hidden email]> wrote:
|
Hi Till, Thank you for your answer, I have couple of questions 1) Setting parallelism on a single map function in local is fine but on distributed will it work as local execution? 2) Is there any other way apart from setting parallelism? Like spark aggregate function? 3) Is it necessary that after transformations to call execute function? Or Execution starts as soon as it encounters a action (Similar to Spark)? 4) Can I create a global execution environment (Either local or distributed) for different Flink program in a module? 5) How to make the records come in sequence for a map or any other operator? Regards, Ravikumar On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote:
|
Hi Ravikumar, I'll try to answer your questions:2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <[hidden email]>:
|
Hi Fabian, Thank you for your answers, 1) If there is only single instance of that function, then it will defeat the purpose of distributed correct me if I am wrong, so If I run parallelism with 1 on cluster does that mean it will execute on only one node? 2) I mean to say, when a map operator returns a variable, is there any other function which takes that updated variable and returns that to all instances of map? 3) Question Cleared. 4) My question was can I use same ExecutionEnvironment for all flink programs in a module. 5) Question Cleared. Regards Ravikumar On 9 June 2016 at 17:58, Fabian Hueske <[hidden email]> wrote:
|
Hi, 1) Yes, that is correct. If you set the parallelism of an operator to 1 it is only executed on a single node. It depends on your application, if you need a global state or whether multiple local states are OK. 2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar <[hidden email]>:
|
Hi Fabian, Thank you for your help. I want my Flink application to be distributed as well as I want the facility to support the update of the variable [Coefficients of LinearRegression]. How you would do in that case? The problem with iteration is that it expects Dataset with same type to be fed back, and my variable is just a double[]. Otherwise I have to map every record with a double[] wrapped inside a tuple2 and then try out iterations but I am sure this won't work as well. Can I use closure or lambdas to maintain global state? Regards, Ravikumar On 9 June 2016 at 20:17, Fabian Hueske <[hidden email]> wrote:
|
Hi Ravikumar,
In short: No, you can't use closures to maintain a global state. If you want to keep an always global state, you'll have to use parallelism 1 or an external data store to keep that global state. Is it possible to break up your global state into a set of local states which can be combined in the end? That way, you can take advantage of distributed parallel processing. Cheers, Max On Fri, Jun 10, 2016 at 8:28 AM, Ravikumar Hawaldar <[hidden email]> wrote: > Hi Fabian, Thank you for your help. > > I want my Flink application to be distributed as well as I want the facility > to support the update of the variable [Coefficients of LinearRegression]. > > How you would do in that case? > > The problem with iteration is that it expects Dataset with same type to be > fed back, and my variable is just a double[]. Otherwise I have to map every > record with a double[] wrapped inside a tuple2 and then try out iterations > but I am sure this won't work as well. > > Can I use closure or lambdas to maintain global state? > > > Regards, > Ravikumar > > On 9 June 2016 at 20:17, Fabian Hueske <[hidden email]> wrote: >> >> Hi, >> >> 1) Yes, that is correct. If you set the parallelism of an operator to 1 it >> is only executed on a single node. It depends on your application, if you >> need a global state or whether multiple local states are OK. >> 2) Flink programs follow the concept a data flow. There is no >> communication between parallel instances of a task, i.e., all four tasks of >> a MapOperator with parallelism 4 cannot talk to each other. You might want >> to take a look at Flink's iteration operators. With these you can feed data >> back into a previous operator [1]. >> 4) Yes, that should work. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html >> >> 2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar >> <[hidden email]>: >>> >>> Hi Fabian, Thank you for your answers, >>> >>> 1) If there is only single instance of that function, then it will defeat >>> the purpose of distributed correct me if I am wrong, so If I run parallelism >>> with 1 on cluster does that mean it will execute on only one node? >>> >>> 2) I mean to say, when a map operator returns a variable, is there any >>> other function which takes that updated variable and returns that to all >>> instances of map? >>> >>> 3) Question Cleared. >>> >>> 4) My question was can I use same ExecutionEnvironment for all flink >>> programs in a module. >>> >>> 5) Question Cleared. >>> >>> >>> Regards >>> Ravikumar >>> >>> >>> >>> On 9 June 2016 at 17:58, Fabian Hueske <[hidden email]> wrote: >>>> >>>> Hi Ravikumar, >>>> >>>> I'll try to answer your questions: >>>> 1) If you set the parallelism of a map function to 1, there will be only >>>> a single instance of that function regardless whether it is execution >>>> locally or remotely in a cluster. >>>> 2) Flink does also support aggregations, (reduce, groupReduce, combine, >>>> ...). However, I do not see how this would help with a stateful map >>>> function. >>>> 3) In Flink DataSet programs you usually construct the complete program >>>> and call execute() after you have defined your sinks. There are two >>>> exceptions: print() and collect() which both add special sinks and >>>> immediately execute your program. print() prints the result to the stdout of >>>> the submitting client and collect() fetches a dataset as collection. >>>> 4) I am not sure I understood your question. When you obtain an >>>> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() the >>>> type of the returned environment depends on the context in which the program >>>> was executed. It can be a local environment if it is executed from within an >>>> IDE or a RemodeExecutionEnvironment if the program is executed via the CLI >>>> client and shipped to a remote cluster. >>>> 5) A map operator processes records one after the other, i.e., as a >>>> sequence. If you need a certain order, you can call DataSet.sortPartition() >>>> to locally sort the partition. >>>> >>>> Hope that helps, >>>> Fabian >>>> >>>> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar >>>> <[hidden email]>: >>>>> >>>>> Hi Till, Thank you for your answer, I have couple of questions >>>>> >>>>> 1) Setting parallelism on a single map function in local is fine but on >>>>> distributed will it work as local execution? >>>>> >>>>> 2) Is there any other way apart from setting parallelism? Like spark >>>>> aggregate function? >>>>> >>>>> 3) Is it necessary that after transformations to call execute function? >>>>> Or Execution starts as soon as it encounters a action (Similar to Spark)? >>>>> >>>>> 4) Can I create a global execution environment (Either local or >>>>> distributed) for different Flink program in a module? >>>>> >>>>> 5) How to make the records come in sequence for a map or any other >>>>> operator? >>>>> >>>>> >>>>> Regards, >>>>> Ravikumar >>>>> >>>>> >>>>> On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote: >>>>>> >>>>>> Hi Ravikumar, >>>>>> >>>>>> Flink's operators are stateful. So you can simply create a variable in >>>>>> your mapper to keep the state around. But every mapper instance will have >>>>>> it's own state. This state is determined by the records which are sent to >>>>>> this mapper instance. If you need a global state, then you have to set the >>>>>> parallelism to 1. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar >>>>>> <[hidden email]> wrote: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I have an DataSet<UserDefinedType> which is roughly a record in a >>>>>>> DataSet Or a file. >>>>>>> >>>>>>> Now I am using map transformation on this DataSet to compute a >>>>>>> variable (coefficients of linear regression parameters and data structure >>>>>>> used is a double[]). >>>>>>> >>>>>>> Now the issue is that, per record the variable will get updated and I >>>>>>> am struggling to maintain state of this variable for the next record. >>>>>>> >>>>>>> In simple, for first record the variable values will be 0.0, and >>>>>>> after first record the variable will get updated and I have to pass this >>>>>>> updated variable for the second record and so on for all records in DataSet. >>>>>>> >>>>>>> Any suggestions on how to maintain state of a variable? >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Ravikumar >>>>>> >>>>>> >>>>> >>>> >>> >> > |
Hi Maximilian, Thank you for the response. Yeah its possible to break up global state but its very tricky to merge two local state variables and also I have to refactor my algorithm logic. Is there way where I can create a new object every time in reduce function so that I can assign the computed variable to it, like Spark aggregate? Regards, Ravikumar On 13 June 2016 at 15:24, Maximilian Michels <[hidden email]> wrote: Hi Ravikumar, |
Free forum by Nabble | Edit this page |