How to maintain the state of a variable in a map transformation.

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

How to maintain the state of a variable in a map transformation.

Ravikumar Hawaldar
Hello,

I have an DataSet<UserDefinedType> which is roughly a record in a DataSet Or a file.

Now I am using map transformation on this DataSet to compute a variable (coefficients of linear regression parameters and data structure used is a double[]).

Now the issue is that, per record the variable will get updated and I am struggling to maintain state of this variable for the next record.

In simple, for first record the variable values will be 0.0, and after first record the variable will get updated and I have to pass this updated variable for the second record and so on for all records in DataSet.

Any suggestions on how to maintain state of a variable? 


Regards,
Ravikumar
Reply | Threaded
Open this post in threaded view
|

Re: How to maintain the state of a variable in a map transformation.

Till Rohrmann
Hi Ravikumar,

Flink's operators are stateful. So you can simply create a variable in your mapper to keep the state around. But every mapper instance will have it's own state. This state is determined by the records which are sent to this mapper instance. If you need a global state, then you have to set the parallelism to 1.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <[hidden email]> wrote:
Hello,

I have an DataSet<UserDefinedType> which is roughly a record in a DataSet Or a file.

Now I am using map transformation on this DataSet to compute a variable (coefficients of linear regression parameters and data structure used is a double[]).

Now the issue is that, per record the variable will get updated and I am struggling to maintain state of this variable for the next record.

In simple, for first record the variable values will be 0.0, and after first record the variable will get updated and I have to pass this updated variable for the second record and so on for all records in DataSet.

Any suggestions on how to maintain state of a variable? 


Regards,
Ravikumar

Reply | Threaded
Open this post in threaded view
|

Re: How to maintain the state of a variable in a map transformation.

Ravikumar Hawaldar
Hi Till, Thank you for your answer, I have couple of questions

1) Setting parallelism on a single map function in local is fine but on distributed will it work as local execution?

2) Is there any other way apart from setting parallelism? Like spark aggregate function?

3) Is it necessary that after transformations to call execute function? Or Execution starts as soon as it encounters a action (Similar to Spark)?

4) Can I create a global execution environment (Either local or distributed) for different Flink program in a module?

5) How to make the records come in sequence for a map or any other operator?


Regards,
Ravikumar


On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote:
Hi Ravikumar,

Flink's operators are stateful. So you can simply create a variable in your mapper to keep the state around. But every mapper instance will have it's own state. This state is determined by the records which are sent to this mapper instance. If you need a global state, then you have to set the parallelism to 1.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <[hidden email]> wrote:
Hello,

I have an DataSet<UserDefinedType> which is roughly a record in a DataSet Or a file.

Now I am using map transformation on this DataSet to compute a variable (coefficients of linear regression parameters and data structure used is a double[]).

Now the issue is that, per record the variable will get updated and I am struggling to maintain state of this variable for the next record.

In simple, for first record the variable values will be 0.0, and after first record the variable will get updated and I have to pass this updated variable for the second record and so on for all records in DataSet.

Any suggestions on how to maintain state of a variable? 


Regards,
Ravikumar


Reply | Threaded
Open this post in threaded view
|

Re: How to maintain the state of a variable in a map transformation.

Fabian Hueske-2
Hi Ravikumar,

I'll try to answer your questions:
1) If you set the parallelism of a map function to 1, there will be only a single instance of that function regardless whether it is execution locally or remotely in a cluster.
2) Flink does also support aggregations, (reduce, groupReduce, combine, ...). However, I do not see how this would help with a stateful map function.
3) In Flink DataSet programs you usually construct the complete program and call execute() after you have defined your sinks. There are two exceptions: print() and collect() which both add special sinks and immediately execute your program. print() prints the result to the stdout of the submitting client and collect() fetches a dataset as collection.
4) I am not sure I understood your question. When you obtain an ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() the type of the returned environment depends on the context in which the program was executed. It can be a local environment if it is executed from within an IDE or a RemodeExecutionEnvironment if the program is executed via the CLI client and shipped to a remote cluster.
5) A map operator processes records one after the other, i.e., as a sequence. If you need a certain order, you can call DataSet.sortPartition() to locally sort the partition.

Hope that helps,
Fabian

2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <[hidden email]>:
Hi Till, Thank you for your answer, I have couple of questions

1) Setting parallelism on a single map function in local is fine but on distributed will it work as local execution?

2) Is there any other way apart from setting parallelism? Like spark aggregate function?

3) Is it necessary that after transformations to call execute function? Or Execution starts as soon as it encounters a action (Similar to Spark)?

4) Can I create a global execution environment (Either local or distributed) for different Flink program in a module?

5) How to make the records come in sequence for a map or any other operator?


Regards,
Ravikumar


On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote:
Hi Ravikumar,

Flink's operators are stateful. So you can simply create a variable in your mapper to keep the state around. But every mapper instance will have it's own state. This state is determined by the records which are sent to this mapper instance. If you need a global state, then you have to set the parallelism to 1.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <[hidden email]> wrote:
Hello,

I have an DataSet<UserDefinedType> which is roughly a record in a DataSet Or a file.

Now I am using map transformation on this DataSet to compute a variable (coefficients of linear regression parameters and data structure used is a double[]).

Now the issue is that, per record the variable will get updated and I am struggling to maintain state of this variable for the next record.

In simple, for first record the variable values will be 0.0, and after first record the variable will get updated and I have to pass this updated variable for the second record and so on for all records in DataSet.

Any suggestions on how to maintain state of a variable? 


Regards,
Ravikumar



Reply | Threaded
Open this post in threaded view
|

Re: How to maintain the state of a variable in a map transformation.

Ravikumar Hawaldar
Hi Fabian, Thank you for your answers,

1) If there is only single instance of that function, then it will defeat the purpose of distributed correct me if I am wrong, so If I run parallelism with 1 on cluster does that mean it will execute on only one node?

2) I mean to say, when a map operator returns a variable, is there any other function which takes that updated variable and returns that to all instances of map?

3) Question Cleared.

4) My question was can I use same ExecutionEnvironment for all flink programs in a module.

5) Question Cleared.


Regards
Ravikumar



On 9 June 2016 at 17:58, Fabian Hueske <[hidden email]> wrote:
Hi Ravikumar,

I'll try to answer your questions:
1) If you set the parallelism of a map function to 1, there will be only a single instance of that function regardless whether it is execution locally or remotely in a cluster.
2) Flink does also support aggregations, (reduce, groupReduce, combine, ...). However, I do not see how this would help with a stateful map function.
3) In Flink DataSet programs you usually construct the complete program and call execute() after you have defined your sinks. There are two exceptions: print() and collect() which both add special sinks and immediately execute your program. print() prints the result to the stdout of the submitting client and collect() fetches a dataset as collection.
4) I am not sure I understood your question. When you obtain an ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() the type of the returned environment depends on the context in which the program was executed. It can be a local environment if it is executed from within an IDE or a RemodeExecutionEnvironment if the program is executed via the CLI client and shipped to a remote cluster.
5) A map operator processes records one after the other, i.e., as a sequence. If you need a certain order, you can call DataSet.sortPartition() to locally sort the partition.

Hope that helps,
Fabian

2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <[hidden email]>:
Hi Till, Thank you for your answer, I have couple of questions

1) Setting parallelism on a single map function in local is fine but on distributed will it work as local execution?

2) Is there any other way apart from setting parallelism? Like spark aggregate function?

3) Is it necessary that after transformations to call execute function? Or Execution starts as soon as it encounters a action (Similar to Spark)?

4) Can I create a global execution environment (Either local or distributed) for different Flink program in a module?

5) How to make the records come in sequence for a map or any other operator?


Regards,
Ravikumar


On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote:
Hi Ravikumar,

Flink's operators are stateful. So you can simply create a variable in your mapper to keep the state around. But every mapper instance will have it's own state. This state is determined by the records which are sent to this mapper instance. If you need a global state, then you have to set the parallelism to 1.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <[hidden email]> wrote:
Hello,

I have an DataSet<UserDefinedType> which is roughly a record in a DataSet Or a file.

Now I am using map transformation on this DataSet to compute a variable (coefficients of linear regression parameters and data structure used is a double[]).

Now the issue is that, per record the variable will get updated and I am struggling to maintain state of this variable for the next record.

In simple, for first record the variable values will be 0.0, and after first record the variable will get updated and I have to pass this updated variable for the second record and so on for all records in DataSet.

Any suggestions on how to maintain state of a variable? 


Regards,
Ravikumar




Reply | Threaded
Open this post in threaded view
|

Re: How to maintain the state of a variable in a map transformation.

Fabian Hueske-2
Hi,

1) Yes, that is correct. If you set the parallelism of an operator to 1 it is only executed on a single node. It depends on your application, if you need a global state or whether multiple local states are OK.
2) Flink programs follow the concept a data flow. There is no communication between parallel instances of a task, i.e., all four tasks of a MapOperator with parallelism 4 cannot talk to each other. You might want to take a look at Flink's iteration operators. With these you can feed data back into a previous operator [1].
4) Yes, that should work.

2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar <[hidden email]>:
Hi Fabian, Thank you for your answers,

1) If there is only single instance of that function, then it will defeat the purpose of distributed correct me if I am wrong, so If I run parallelism with 1 on cluster does that mean it will execute on only one node?

2) I mean to say, when a map operator returns a variable, is there any other function which takes that updated variable and returns that to all instances of map?

3) Question Cleared.

4) My question was can I use same ExecutionEnvironment for all flink programs in a module.

5) Question Cleared.


Regards
Ravikumar



On 9 June 2016 at 17:58, Fabian Hueske <[hidden email]> wrote:
Hi Ravikumar,

I'll try to answer your questions:
1) If you set the parallelism of a map function to 1, there will be only a single instance of that function regardless whether it is execution locally or remotely in a cluster.
2) Flink does also support aggregations, (reduce, groupReduce, combine, ...). However, I do not see how this would help with a stateful map function.
3) In Flink DataSet programs you usually construct the complete program and call execute() after you have defined your sinks. There are two exceptions: print() and collect() which both add special sinks and immediately execute your program. print() prints the result to the stdout of the submitting client and collect() fetches a dataset as collection.
4) I am not sure I understood your question. When you obtain an ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() the type of the returned environment depends on the context in which the program was executed. It can be a local environment if it is executed from within an IDE or a RemodeExecutionEnvironment if the program is executed via the CLI client and shipped to a remote cluster.
5) A map operator processes records one after the other, i.e., as a sequence. If you need a certain order, you can call DataSet.sortPartition() to locally sort the partition.

Hope that helps,
Fabian

2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <[hidden email]>:
Hi Till, Thank you for your answer, I have couple of questions

1) Setting parallelism on a single map function in local is fine but on distributed will it work as local execution?

2) Is there any other way apart from setting parallelism? Like spark aggregate function?

3) Is it necessary that after transformations to call execute function? Or Execution starts as soon as it encounters a action (Similar to Spark)?

4) Can I create a global execution environment (Either local or distributed) for different Flink program in a module?

5) How to make the records come in sequence for a map or any other operator?


Regards,
Ravikumar


On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote:
Hi Ravikumar,

Flink's operators are stateful. So you can simply create a variable in your mapper to keep the state around. But every mapper instance will have it's own state. This state is determined by the records which are sent to this mapper instance. If you need a global state, then you have to set the parallelism to 1.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <[hidden email]> wrote:
Hello,

I have an DataSet<UserDefinedType> which is roughly a record in a DataSet Or a file.

Now I am using map transformation on this DataSet to compute a variable (coefficients of linear regression parameters and data structure used is a double[]).

Now the issue is that, per record the variable will get updated and I am struggling to maintain state of this variable for the next record.

In simple, for first record the variable values will be 0.0, and after first record the variable will get updated and I have to pass this updated variable for the second record and so on for all records in DataSet.

Any suggestions on how to maintain state of a variable? 


Regards,
Ravikumar





Reply | Threaded
Open this post in threaded view
|

Re: How to maintain the state of a variable in a map transformation.

Ravikumar Hawaldar
Hi Fabian,  Thank you for your help.

I want my Flink application to be distributed as well as I want the facility to support the update of the variable [Coefficients of LinearRegression]. 

How you would do in that case?

The problem with iteration is that it expects Dataset with same type to be fed back, and my variable is just a double[]. Otherwise I have to map every record with a double[] wrapped inside a tuple2 and then try out iterations but I am sure this won't work as well.

Can I use closure or lambdas to maintain global state?


Regards,
Ravikumar

On 9 June 2016 at 20:17, Fabian Hueske <[hidden email]> wrote:
Hi,

1) Yes, that is correct. If you set the parallelism of an operator to 1 it is only executed on a single node. It depends on your application, if you need a global state or whether multiple local states are OK.
2) Flink programs follow the concept a data flow. There is no communication between parallel instances of a task, i.e., all four tasks of a MapOperator with parallelism 4 cannot talk to each other. You might want to take a look at Flink's iteration operators. With these you can feed data back into a previous operator [1].
4) Yes, that should work.

2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar <[hidden email]>:
Hi Fabian, Thank you for your answers,

1) If there is only single instance of that function, then it will defeat the purpose of distributed correct me if I am wrong, so If I run parallelism with 1 on cluster does that mean it will execute on only one node?

2) I mean to say, when a map operator returns a variable, is there any other function which takes that updated variable and returns that to all instances of map?

3) Question Cleared.

4) My question was can I use same ExecutionEnvironment for all flink programs in a module.

5) Question Cleared.


Regards
Ravikumar



On 9 June 2016 at 17:58, Fabian Hueske <[hidden email]> wrote:
Hi Ravikumar,

I'll try to answer your questions:
1) If you set the parallelism of a map function to 1, there will be only a single instance of that function regardless whether it is execution locally or remotely in a cluster.
2) Flink does also support aggregations, (reduce, groupReduce, combine, ...). However, I do not see how this would help with a stateful map function.
3) In Flink DataSet programs you usually construct the complete program and call execute() after you have defined your sinks. There are two exceptions: print() and collect() which both add special sinks and immediately execute your program. print() prints the result to the stdout of the submitting client and collect() fetches a dataset as collection.
4) I am not sure I understood your question. When you obtain an ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() the type of the returned environment depends on the context in which the program was executed. It can be a local environment if it is executed from within an IDE or a RemodeExecutionEnvironment if the program is executed via the CLI client and shipped to a remote cluster.
5) A map operator processes records one after the other, i.e., as a sequence. If you need a certain order, you can call DataSet.sortPartition() to locally sort the partition.

Hope that helps,
Fabian

2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <[hidden email]>:
Hi Till, Thank you for your answer, I have couple of questions

1) Setting parallelism on a single map function in local is fine but on distributed will it work as local execution?

2) Is there any other way apart from setting parallelism? Like spark aggregate function?

3) Is it necessary that after transformations to call execute function? Or Execution starts as soon as it encounters a action (Similar to Spark)?

4) Can I create a global execution environment (Either local or distributed) for different Flink program in a module?

5) How to make the records come in sequence for a map or any other operator?


Regards,
Ravikumar


On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote:
Hi Ravikumar,

Flink's operators are stateful. So you can simply create a variable in your mapper to keep the state around. But every mapper instance will have it's own state. This state is determined by the records which are sent to this mapper instance. If you need a global state, then you have to set the parallelism to 1.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <[hidden email]> wrote:
Hello,

I have an DataSet<UserDefinedType> which is roughly a record in a DataSet Or a file.

Now I am using map transformation on this DataSet to compute a variable (coefficients of linear regression parameters and data structure used is a double[]).

Now the issue is that, per record the variable will get updated and I am struggling to maintain state of this variable for the next record.

In simple, for first record the variable values will be 0.0, and after first record the variable will get updated and I have to pass this updated variable for the second record and so on for all records in DataSet.

Any suggestions on how to maintain state of a variable? 


Regards,
Ravikumar






Reply | Threaded
Open this post in threaded view
|

Re: How to maintain the state of a variable in a map transformation.

Maximilian Michels
Hi Ravikumar,

In short: No, you can't use closures to maintain a global state. If
you want to keep an always global state, you'll have to use
parallelism 1 or an external data store to keep that global state.

Is it possible to break up your global state into a set of local
states which can be combined in the end? That way, you can take
advantage of distributed parallel processing.

Cheers,
Max

On Fri, Jun 10, 2016 at 8:28 AM, Ravikumar Hawaldar
<[hidden email]> wrote:

> Hi Fabian,  Thank you for your help.
>
> I want my Flink application to be distributed as well as I want the facility
> to support the update of the variable [Coefficients of LinearRegression].
>
> How you would do in that case?
>
> The problem with iteration is that it expects Dataset with same type to be
> fed back, and my variable is just a double[]. Otherwise I have to map every
> record with a double[] wrapped inside a tuple2 and then try out iterations
> but I am sure this won't work as well.
>
> Can I use closure or lambdas to maintain global state?
>
>
> Regards,
> Ravikumar
>
> On 9 June 2016 at 20:17, Fabian Hueske <[hidden email]> wrote:
>>
>> Hi,
>>
>> 1) Yes, that is correct. If you set the parallelism of an operator to 1 it
>> is only executed on a single node. It depends on your application, if you
>> need a global state or whether multiple local states are OK.
>> 2) Flink programs follow the concept a data flow. There is no
>> communication between parallel instances of a task, i.e., all four tasks of
>> a MapOperator with parallelism 4 cannot talk to each other. You might want
>> to take a look at Flink's iteration operators. With these you can feed data
>> back into a previous operator [1].
>> 4) Yes, that should work.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html
>>
>> 2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar
>> <[hidden email]>:
>>>
>>> Hi Fabian, Thank you for your answers,
>>>
>>> 1) If there is only single instance of that function, then it will defeat
>>> the purpose of distributed correct me if I am wrong, so If I run parallelism
>>> with 1 on cluster does that mean it will execute on only one node?
>>>
>>> 2) I mean to say, when a map operator returns a variable, is there any
>>> other function which takes that updated variable and returns that to all
>>> instances of map?
>>>
>>> 3) Question Cleared.
>>>
>>> 4) My question was can I use same ExecutionEnvironment for all flink
>>> programs in a module.
>>>
>>> 5) Question Cleared.
>>>
>>>
>>> Regards
>>> Ravikumar
>>>
>>>
>>>
>>> On 9 June 2016 at 17:58, Fabian Hueske <[hidden email]> wrote:
>>>>
>>>> Hi Ravikumar,
>>>>
>>>> I'll try to answer your questions:
>>>> 1) If you set the parallelism of a map function to 1, there will be only
>>>> a single instance of that function regardless whether it is execution
>>>> locally or remotely in a cluster.
>>>> 2) Flink does also support aggregations, (reduce, groupReduce, combine,
>>>> ...). However, I do not see how this would help with a stateful map
>>>> function.
>>>> 3) In Flink DataSet programs you usually construct the complete program
>>>> and call execute() after you have defined your sinks. There are two
>>>> exceptions: print() and collect() which both add special sinks and
>>>> immediately execute your program. print() prints the result to the stdout of
>>>> the submitting client and collect() fetches a dataset as collection.
>>>> 4) I am not sure I understood your question. When you obtain an
>>>> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() the
>>>> type of the returned environment depends on the context in which the program
>>>> was executed. It can be a local environment if it is executed from within an
>>>> IDE or a RemodeExecutionEnvironment if the program is executed via the CLI
>>>> client and shipped to a remote cluster.
>>>> 5) A map operator processes records one after the other, i.e., as a
>>>> sequence. If you need a certain order, you can call DataSet.sortPartition()
>>>> to locally sort the partition.
>>>>
>>>> Hope that helps,
>>>> Fabian
>>>>
>>>> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar
>>>> <[hidden email]>:
>>>>>
>>>>> Hi Till, Thank you for your answer, I have couple of questions
>>>>>
>>>>> 1) Setting parallelism on a single map function in local is fine but on
>>>>> distributed will it work as local execution?
>>>>>
>>>>> 2) Is there any other way apart from setting parallelism? Like spark
>>>>> aggregate function?
>>>>>
>>>>> 3) Is it necessary that after transformations to call execute function?
>>>>> Or Execution starts as soon as it encounters a action (Similar to Spark)?
>>>>>
>>>>> 4) Can I create a global execution environment (Either local or
>>>>> distributed) for different Flink program in a module?
>>>>>
>>>>> 5) How to make the records come in sequence for a map or any other
>>>>> operator?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Ravikumar
>>>>>
>>>>>
>>>>> On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote:
>>>>>>
>>>>>> Hi Ravikumar,
>>>>>>
>>>>>> Flink's operators are stateful. So you can simply create a variable in
>>>>>> your mapper to keep the state around. But every mapper instance will have
>>>>>> it's own state. This state is determined by the records which are sent to
>>>>>> this mapper instance. If you need a global state, then you have to set the
>>>>>> parallelism to 1.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have an DataSet<UserDefinedType> which is roughly a record in a
>>>>>>> DataSet Or a file.
>>>>>>>
>>>>>>> Now I am using map transformation on this DataSet to compute a
>>>>>>> variable (coefficients of linear regression parameters and data structure
>>>>>>> used is a double[]).
>>>>>>>
>>>>>>> Now the issue is that, per record the variable will get updated and I
>>>>>>> am struggling to maintain state of this variable for the next record.
>>>>>>>
>>>>>>> In simple, for first record the variable values will be 0.0, and
>>>>>>> after first record the variable will get updated and I have to pass this
>>>>>>> updated variable for the second record and so on for all records in DataSet.
>>>>>>>
>>>>>>> Any suggestions on how to maintain state of a variable?
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Ravikumar
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to maintain the state of a variable in a map transformation.

Ravikumar Hawaldar
Hi Maximilian,

Thank you for the response.

Yeah its possible to break up global state but its very tricky to merge two local state variables and also I have to refactor my algorithm logic.

Is there way where I can create a new object every time in reduce function so that I can assign the computed variable to it, like Spark aggregate?


Regards,
Ravikumar

On 13 June 2016 at 15:24, Maximilian Michels <[hidden email]> wrote:
Hi Ravikumar,

In short: No, you can't use closures to maintain a global state. If
you want to keep an always global state, you'll have to use
parallelism 1 or an external data store to keep that global state.

Is it possible to break up your global state into a set of local
states which can be combined in the end? That way, you can take
advantage of distributed parallel processing.

Cheers,
Max

On Fri, Jun 10, 2016 at 8:28 AM, Ravikumar Hawaldar
<[hidden email]> wrote:
> Hi Fabian,  Thank you for your help.
>
> I want my Flink application to be distributed as well as I want the facility
> to support the update of the variable [Coefficients of LinearRegression].
>
> How you would do in that case?
>
> The problem with iteration is that it expects Dataset with same type to be
> fed back, and my variable is just a double[]. Otherwise I have to map every
> record with a double[] wrapped inside a tuple2 and then try out iterations
> but I am sure this won't work as well.
>
> Can I use closure or lambdas to maintain global state?
>
>
> Regards,
> Ravikumar
>
> On 9 June 2016 at 20:17, Fabian Hueske <[hidden email]> wrote:
>>
>> Hi,
>>
>> 1) Yes, that is correct. If you set the parallelism of an operator to 1 it
>> is only executed on a single node. It depends on your application, if you
>> need a global state or whether multiple local states are OK.
>> 2) Flink programs follow the concept a data flow. There is no
>> communication between parallel instances of a task, i.e., all four tasks of
>> a MapOperator with parallelism 4 cannot talk to each other. You might want
>> to take a look at Flink's iteration operators. With these you can feed data
>> back into a previous operator [1].
>> 4) Yes, that should work.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html
>>
>> 2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar
>> <[hidden email]>:
>>>
>>> Hi Fabian, Thank you for your answers,
>>>
>>> 1) If there is only single instance of that function, then it will defeat
>>> the purpose of distributed correct me if I am wrong, so If I run parallelism
>>> with 1 on cluster does that mean it will execute on only one node?
>>>
>>> 2) I mean to say, when a map operator returns a variable, is there any
>>> other function which takes that updated variable and returns that to all
>>> instances of map?
>>>
>>> 3) Question Cleared.
>>>
>>> 4) My question was can I use same ExecutionEnvironment for all flink
>>> programs in a module.
>>>
>>> 5) Question Cleared.
>>>
>>>
>>> Regards
>>> Ravikumar
>>>
>>>
>>>
>>> On 9 June 2016 at 17:58, Fabian Hueske <[hidden email]> wrote:
>>>>
>>>> Hi Ravikumar,
>>>>
>>>> I'll try to answer your questions:
>>>> 1) If you set the parallelism of a map function to 1, there will be only
>>>> a single instance of that function regardless whether it is execution
>>>> locally or remotely in a cluster.
>>>> 2) Flink does also support aggregations, (reduce, groupReduce, combine,
>>>> ...). However, I do not see how this would help with a stateful map
>>>> function.
>>>> 3) In Flink DataSet programs you usually construct the complete program
>>>> and call execute() after you have defined your sinks. There are two
>>>> exceptions: print() and collect() which both add special sinks and
>>>> immediately execute your program. print() prints the result to the stdout of
>>>> the submitting client and collect() fetches a dataset as collection.
>>>> 4) I am not sure I understood your question. When you obtain an
>>>> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() the
>>>> type of the returned environment depends on the context in which the program
>>>> was executed. It can be a local environment if it is executed from within an
>>>> IDE or a RemodeExecutionEnvironment if the program is executed via the CLI
>>>> client and shipped to a remote cluster.
>>>> 5) A map operator processes records one after the other, i.e., as a
>>>> sequence. If you need a certain order, you can call DataSet.sortPartition()
>>>> to locally sort the partition.
>>>>
>>>> Hope that helps,
>>>> Fabian
>>>>
>>>> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar
>>>> <[hidden email]>:
>>>>>
>>>>> Hi Till, Thank you for your answer, I have couple of questions
>>>>>
>>>>> 1) Setting parallelism on a single map function in local is fine but on
>>>>> distributed will it work as local execution?
>>>>>
>>>>> 2) Is there any other way apart from setting parallelism? Like spark
>>>>> aggregate function?
>>>>>
>>>>> 3) Is it necessary that after transformations to call execute function?
>>>>> Or Execution starts as soon as it encounters a action (Similar to Spark)?
>>>>>
>>>>> 4) Can I create a global execution environment (Either local or
>>>>> distributed) for different Flink program in a module?
>>>>>
>>>>> 5) How to make the records come in sequence for a map or any other
>>>>> operator?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Ravikumar
>>>>>
>>>>>
>>>>> On 8 June 2016 at 21:14, Till Rohrmann <[hidden email]> wrote:
>>>>>>
>>>>>> Hi Ravikumar,
>>>>>>
>>>>>> Flink's operators are stateful. So you can simply create a variable in
>>>>>> your mapper to keep the state around. But every mapper instance will have
>>>>>> it's own state. This state is determined by the records which are sent to
>>>>>> this mapper instance. If you need a global state, then you have to set the
>>>>>> parallelism to 1.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have an DataSet<UserDefinedType> which is roughly a record in a
>>>>>>> DataSet Or a file.
>>>>>>>
>>>>>>> Now I am using map transformation on this DataSet to compute a
>>>>>>> variable (coefficients of linear regression parameters and data structure
>>>>>>> used is a double[]).
>>>>>>>
>>>>>>> Now the issue is that, per record the variable will get updated and I
>>>>>>> am struggling to maintain state of this variable for the next record.
>>>>>>>
>>>>>>> In simple, for first record the variable values will be 0.0, and
>>>>>>> after first record the variable will get updated and I have to pass this
>>>>>>> updated variable for the second record and so on for all records in DataSet.
>>>>>>>
>>>>>>> Any suggestions on how to maintain state of a variable?
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Ravikumar
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>