Forced to use Solution Set in Step Function

classic Classic list List threaded Threaded
33 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Stephan Ewen
That is an interesting case. Everything that is loop invariant is computed once outside the loop. You are looking for a way to make this part of the loop.

Can you try making the filter part of the "VectorDataSet.add(old_sum, current)" operation?

On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber <[hidden email]> wrote:
The deltaiteration calculates the cumulative sum (prefix sum) of residual_2.

I'm not sure if got you idea. I could add residual_2 to the workset. But in that case I need to separate the "old_sum"(the sum up to now) and residual_2 each iteration. Actually I had that before, then I tried the Scala closure to clean up the code.

On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]> wrote:
Jep, I see you point.
Conceptually, all data that changes and affects the result of an iteration should be part of the workset.
Hence, the model kind of assumes that the datum "superStepNumber" should be part of the workset.

I am not familiar with your application, but would it make sense to add the number as an additional attribute to the workset data set and increase it manually?

2014-10-14 16:45 GMT+02:00 Maximilian Alber <[hidden email]>:
Ok, sounds true, but somehow I would like to execute it inside of it. So I probably need to do some nonsense work to make it part of it?

On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <[hidden email]> wrote:
Dammit you beat me to it. But yes, this is exactly what I was just writing.

On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <[hidden email]> wrote:
> Hi,
>
> I'm not super familiar with the iterations, but my guess would be that the
> filter is not evaluated as part of the iteration.
> Since it is not connect to the workset, the filter is not part of the loop
> and evaluated once outside where no superset number is available.
> I guess, moving the filter outside of the loop gives the same error.
>
> Cheers, Fabian
>
>
>
> 2014-10-14 16:18 GMT+02:00 Maximilian Alber <[hidden email]>:
>>
>> Hmm or maybe not. With this code I get some strange error:
>>
>> def createPlan_find_center(env: ExecutionEnvironment) = {
>> val X = env readTextFile config.xFile map
>> {Vector.parseFromString(config.dimensions, _)};
>> val residual = env readTextFile config.yFile map
>> {Vector.parseFromString(_)};
>> val randoms = env readTextFile config.randomFile map
>> {Vector.parseFromString(_)}
>>
>> val residual_2 = residual * residual
>> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>
>> val emptyDataSet = env.fromCollection[Vector](Seq())
>> val sumVector = env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N, Array("id")) {
>>     (solutionset, old_sum) =>
>>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>>       def filter(x: Vector) = x.id ==
>> (getIterationRuntimeContext.getSuperstepNumber)
>>     })
>>     val sum = VectorDataSet.add(old_sum, current)
>>
>>     (sum map (new RichMapFunction[Vector, Vector]{
>>       def map(x: Vector) = new
>> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>     }),
>>     sum)
>> }
>>
>> Error:
>> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> SCHEDULED
>> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> DEPLOYING
>> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to RUNNING
>> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> RUNNING
>> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to RUNNING
>> 10/14/2014 15:57:35: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:35: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to RUNNING
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
>> java.lang.IllegalStateException: This stub is not part of an iteration
>> step function.
>> at
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> at
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> at
>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> at
>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> at
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> CANCELING
>> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELING
>> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to CANCELING
>> 10/14/2014 15:57:36: Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S) - UTF-8)
>> (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta Iteration))
>> (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to CANCELING
>> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2) (1/1)
>> switched to CANCELED
>> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> CANCELED
>> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> Error: The program execution failed: java.lang.IllegalStateException: This
>> stub is not part of an iteration step function.
>> at
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> at
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> at
>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> at
>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> at
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> <[hidden email]> wrote:
>>>
>>> Should work now.
>>> Cheers
>>>
>>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>> <[hidden email]> wrote:
>>>>
>>>> Ok, thanks.
>>>> Please let me know when it is fixed.
>>>>
>>>> Cheers
>>>> Max
>>>>
>>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>> Thank you, I will look into that...
>>>>
>>>>
>>>
>>
>




Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Aljoscha Krettek
Maybe you could use the residual_2 data set as a broadcast dataset.
i.e. make in available in the operation that adds the residual for the
current iteration number to the old_sum. (I'm not sure what the
VectorDataSet.add() method does here). If you gave me the complete
code I could try finding an elegant solution to that problem.

On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]> wrote:

> That is an interesting case. Everything that is loop invariant is computed
> once outside the loop. You are looking for a way to make this part of the
> loop.
>
> Can you try making the filter part of the "VectorDataSet.add(old_sum,
> current)" operation?
>
> On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
> <[hidden email]> wrote:
>>
>> The deltaiteration calculates the cumulative sum (prefix sum) of
>> residual_2.
>>
>> I'm not sure if got you idea. I could add residual_2 to the workset. But
>> in that case I need to separate the "old_sum"(the sum up to now) and
>> residual_2 each iteration. Actually I had that before, then I tried the
>> Scala closure to clean up the code.
>>
>> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]> wrote:
>>>
>>> Jep, I see you point.
>>> Conceptually, all data that changes and affects the result of an
>>> iteration should be part of the workset.
>>> Hence, the model kind of assumes that the datum "superStepNumber" should
>>> be part of the workset.
>>>
>>> I am not familiar with your application, but would it make sense to add
>>> the number as an additional attribute to the workset data set and increase
>>> it manually?
>>>
>>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber <[hidden email]>:
>>>>
>>>> Ok, sounds true, but somehow I would like to execute it inside of it. So
>>>> I probably need to do some nonsense work to make it part of it?
>>>>
>>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Dammit you beat me to it. But yes, this is exactly what I was just
>>>>> writing.
>>>>>
>>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <[hidden email]>
>>>>> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I'm not super familiar with the iterations, but my guess would be
>>>>> > that the
>>>>> > filter is not evaluated as part of the iteration.
>>>>> > Since it is not connect to the workset, the filter is not part of the
>>>>> > loop
>>>>> > and evaluated once outside where no superset number is available.
>>>>> > I guess, moving the filter outside of the loop gives the same error.
>>>>> >
>>>>> > Cheers, Fabian
>>>>> >
>>>>> >
>>>>> >
>>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>>>> > <[hidden email]>:
>>>>> >>
>>>>> >> Hmm or maybe not. With this code I get some strange error:
>>>>> >>
>>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>>>>> >> val X = env readTextFile config.xFile map
>>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>>>> >> val residual = env readTextFile config.yFile map
>>>>> >> {Vector.parseFromString(_)};
>>>>> >> val randoms = env readTextFile config.randomFile map
>>>>> >> {Vector.parseFromString(_)}
>>>>> >>
>>>>> >> val residual_2 = residual * residual
>>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>>>> >>
>>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>> >> val sumVector =
>>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>>>>> >> Array("id")) {
>>>>> >>     (solutionset, old_sum) =>
>>>>> >>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>>>>> >>       def filter(x: Vector) = x.id ==
>>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>>>> >>     })
>>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>>>> >>
>>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>>>> >>       def map(x: Vector) = new
>>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>>>> >>     }),
>>>>> >>     sum)
>>>>> >> }
>>>>> >>
>>>>> >> Error:
>>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> RUNNING
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> RUNNING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
>>>>> >> java.lang.IllegalStateException: This stub is not part of an
>>>>> >> iteration
>>>>> >> step function.
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >>
>>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to CANCELING
>>>>> >> 10/14/2014 15:57:36: Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S) -
>>>>> >> UTF-8)
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>>>>> >> Iteration))
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>>>> >> (1/1)
>>>>> >> switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>>>>> >> Error: The program execution failed:
>>>>> >> java.lang.IllegalStateException: This
>>>>> >> stub is not part of an iteration step function.
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >>
>>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>>>> >> <[hidden email]> wrote:
>>>>> >>>
>>>>> >>> Should work now.
>>>>> >>> Cheers
>>>>> >>>
>>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>>>> >>> <[hidden email]> wrote:
>>>>> >>>>
>>>>> >>>> Ok, thanks.
>>>>> >>>> Please let me know when it is fixed.
>>>>> >>>>
>>>>> >>>> Cheers
>>>>> >>>> Max
>>>>> >>>>
>>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <[hidden email]>
>>>>> >>>> wrote:
>>>>> >>>>>
>>>>> >>>>> Thank you, I will look into that...
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Maximilian Alber
In reply to this post by Stephan Ewen
It should be no problem to make it part of the loop or getting residuals_2 into the workset, it is just a bit inefficient.

No, that would not make sense. old_sum and current are two datasets with just one vector.

I think putting residuals_2 into the workset and then filter for current and old_sum is the best workaround.

On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]> wrote:
That is an interesting case. Everything that is loop invariant is computed once outside the loop. You are looking for a way to make this part of the loop.

Can you try making the filter part of the "VectorDataSet.add(old_sum, current)" operation?

On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber <[hidden email]> wrote:
The deltaiteration calculates the cumulative sum (prefix sum) of residual_2.

I'm not sure if got you idea. I could add residual_2 to the workset. But in that case I need to separate the "old_sum"(the sum up to now) and residual_2 each iteration. Actually I had that before, then I tried the Scala closure to clean up the code.

On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]> wrote:
Jep, I see you point.
Conceptually, all data that changes and affects the result of an iteration should be part of the workset.
Hence, the model kind of assumes that the datum "superStepNumber" should be part of the workset.

I am not familiar with your application, but would it make sense to add the number as an additional attribute to the workset data set and increase it manually?

2014-10-14 16:45 GMT+02:00 Maximilian Alber <[hidden email]>:
Ok, sounds true, but somehow I would like to execute it inside of it. So I probably need to do some nonsense work to make it part of it?

On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <[hidden email]> wrote:
Dammit you beat me to it. But yes, this is exactly what I was just writing.

On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <[hidden email]> wrote:
> Hi,
>
> I'm not super familiar with the iterations, but my guess would be that the
> filter is not evaluated as part of the iteration.
> Since it is not connect to the workset, the filter is not part of the loop
> and evaluated once outside where no superset number is available.
> I guess, moving the filter outside of the loop gives the same error.
>
> Cheers, Fabian
>
>
>
> 2014-10-14 16:18 GMT+02:00 Maximilian Alber <[hidden email]>:
>>
>> Hmm or maybe not. With this code I get some strange error:
>>
>> def createPlan_find_center(env: ExecutionEnvironment) = {
>> val X = env readTextFile config.xFile map
>> {Vector.parseFromString(config.dimensions, _)};
>> val residual = env readTextFile config.yFile map
>> {Vector.parseFromString(_)};
>> val randoms = env readTextFile config.randomFile map
>> {Vector.parseFromString(_)}
>>
>> val residual_2 = residual * residual
>> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>
>> val emptyDataSet = env.fromCollection[Vector](Seq())
>> val sumVector = env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N, Array("id")) {
>>     (solutionset, old_sum) =>
>>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>>       def filter(x: Vector) = x.id ==
>> (getIterationRuntimeContext.getSuperstepNumber)
>>     })
>>     val sum = VectorDataSet.add(old_sum, current)
>>
>>     (sum map (new RichMapFunction[Vector, Vector]{
>>       def map(x: Vector) = new
>> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>     }),
>>     sum)
>> }
>>
>> Error:
>> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> SCHEDULED
>> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> DEPLOYING
>> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to RUNNING
>> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> RUNNING
>> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to RUNNING
>> 10/14/2014 15:57:35: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:35: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to SCHEDULED
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to DEPLOYING
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to RUNNING
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
>> java.lang.IllegalStateException: This stub is not part of an iteration
>> step function.
>> at
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> at
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> at
>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> at
>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> at
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> CANCELING
>> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELING
>> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to CANCELING
>> 10/14/2014 15:57:36: Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S) - UTF-8)
>> (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta Iteration))
>> (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to CANCELING
>> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2) (1/1)
>> switched to CANCELED
>> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
>> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> CANCELED
>> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: CHAIN
>> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>> Iteration)) (1/1) switched to CANCELED
>> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> Error: The program execution failed: java.lang.IllegalStateException: This
>> stub is not part of an iteration step function.
>> at
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> at
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> at
>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> at
>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> at
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> <[hidden email]> wrote:
>>>
>>> Should work now.
>>> Cheers
>>>
>>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>> <[hidden email]> wrote:
>>>>
>>>> Ok, thanks.
>>>> Please let me know when it is fixed.
>>>>
>>>> Cheers
>>>> Max
>>>>
>>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>> Thank you, I will look into that...
>>>>
>>>>
>>>
>>
>





Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Maximilian Alber
In reply to this post by Aljoscha Krettek
Ok, that's possible too.

VectorDataSet is just scala magic to ease my life (See below). If you want to take a look, I appended the package. The main code is in BumpBoost.scala. In util.scala is the vector stuff.
Thanks!

class VectorDataSet(X: DataSet[Vector]){
def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)

def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
def sumV() = VectorDataSet.sumV(X)
}
object VectorDataSet {
def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x => x._1 + x._2}
def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x => x._1 - x._2}
def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x => x._1 * x._2}
def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x => x._1 / x._2}

def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where "id" equalTo "id"
def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}

implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new VectorDataSet(ds)
}


On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]> wrote:
Maybe you could use the residual_2 data set as a broadcast dataset.
i.e. make in available in the operation that adds the residual for the
current iteration number to the old_sum. (I'm not sure what the
VectorDataSet.add() method does here). If you gave me the complete
code I could try finding an elegant solution to that problem.

On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]> wrote:
> That is an interesting case. Everything that is loop invariant is computed
> once outside the loop. You are looking for a way to make this part of the
> loop.
>
> Can you try making the filter part of the "VectorDataSet.add(old_sum,
> current)" operation?
>
> On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
> <[hidden email]> wrote:
>>
>> The deltaiteration calculates the cumulative sum (prefix sum) of
>> residual_2.
>>
>> I'm not sure if got you idea. I could add residual_2 to the workset. But
>> in that case I need to separate the "old_sum"(the sum up to now) and
>> residual_2 each iteration. Actually I had that before, then I tried the
>> Scala closure to clean up the code.
>>
>> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]> wrote:
>>>
>>> Jep, I see you point.
>>> Conceptually, all data that changes and affects the result of an
>>> iteration should be part of the workset.
>>> Hence, the model kind of assumes that the datum "superStepNumber" should
>>> be part of the workset.
>>>
>>> I am not familiar with your application, but would it make sense to add
>>> the number as an additional attribute to the workset data set and increase
>>> it manually?
>>>
>>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber <[hidden email]>:
>>>>
>>>> Ok, sounds true, but somehow I would like to execute it inside of it. So
>>>> I probably need to do some nonsense work to make it part of it?
>>>>
>>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Dammit you beat me to it. But yes, this is exactly what I was just
>>>>> writing.
>>>>>
>>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <[hidden email]>
>>>>> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I'm not super familiar with the iterations, but my guess would be
>>>>> > that the
>>>>> > filter is not evaluated as part of the iteration.
>>>>> > Since it is not connect to the workset, the filter is not part of the
>>>>> > loop
>>>>> > and evaluated once outside where no superset number is available.
>>>>> > I guess, moving the filter outside of the loop gives the same error.
>>>>> >
>>>>> > Cheers, Fabian
>>>>> >
>>>>> >
>>>>> >
>>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>>>> > <[hidden email]>:
>>>>> >>
>>>>> >> Hmm or maybe not. With this code I get some strange error:
>>>>> >>
>>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>>>>> >> val X = env readTextFile config.xFile map
>>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>>>> >> val residual = env readTextFile config.yFile map
>>>>> >> {Vector.parseFromString(_)};
>>>>> >> val randoms = env readTextFile config.randomFile map
>>>>> >> {Vector.parseFromString(_)}
>>>>> >>
>>>>> >> val residual_2 = residual * residual
>>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>>>> >>
>>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>> >> val sumVector =
>>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>>>>> >> Array("id")) {
>>>>> >>     (solutionset, old_sum) =>
>>>>> >>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>>>>> >>       def filter(x: Vector) = x.id ==
>>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>>>> >>     })
>>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>>>> >>
>>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>>>> >>       def map(x: Vector) = new
>>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>>>> >>     }),
>>>>> >>     sum)
>>>>> >> }
>>>>> >>
>>>>> >> Error:
>>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> RUNNING
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> RUNNING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
>>>>> >> java.lang.IllegalStateException: This stub is not part of an
>>>>> >> iteration
>>>>> >> step function.
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >>
>>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to CANCELING
>>>>> >> 10/14/2014 15:57:36: Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S) -
>>>>> >> UTF-8)
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>>>>> >> Iteration))
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>>>> >> (1/1)
>>>>> >> switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>>>>> >> Error: The program execution failed:
>>>>> >> java.lang.IllegalStateException: This
>>>>> >> stub is not part of an iteration step function.
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >>
>>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>>>> >> <[hidden email]> wrote:
>>>>> >>>
>>>>> >>> Should work now.
>>>>> >>> Cheers
>>>>> >>>
>>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>>>> >>> <[hidden email]> wrote:
>>>>> >>>>
>>>>> >>>> Ok, thanks.
>>>>> >>>> Please let me know when it is fixed.
>>>>> >>>>
>>>>> >>>> Cheers
>>>>> >>>> Max
>>>>> >>>>
>>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <[hidden email]>
>>>>> >>>> wrote:
>>>>> >>>>>
>>>>> >>>>> Thank you, I will look into that...
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>
>>>>
>>>
>>
>


bb.tar.gz (225K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Stephan Ewen
BTW: The current master allows you to not join with the solution set, and only use it to accumulate data.

On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber <[hidden email]> wrote:
Ok, that's possible too.

VectorDataSet is just scala magic to ease my life (See below). If you want to take a look, I appended the package. The main code is in BumpBoost.scala. In util.scala is the vector stuff.
Thanks!

class VectorDataSet(X: DataSet[Vector]){
def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)

def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
def sumV() = VectorDataSet.sumV(X)
}
object VectorDataSet {
def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x => x._1 + x._2}
def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x => x._1 - x._2}
def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x => x._1 * x._2}
def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x => x._1 / x._2}

def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where "id" equalTo "id"
def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}

implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new VectorDataSet(ds)
}


On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]> wrote:
Maybe you could use the residual_2 data set as a broadcast dataset.
i.e. make in available in the operation that adds the residual for the
current iteration number to the old_sum. (I'm not sure what the
VectorDataSet.add() method does here). If you gave me the complete
code I could try finding an elegant solution to that problem.

On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]> wrote:
> That is an interesting case. Everything that is loop invariant is computed
> once outside the loop. You are looking for a way to make this part of the
> loop.
>
> Can you try making the filter part of the "VectorDataSet.add(old_sum,
> current)" operation?
>
> On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
> <[hidden email]> wrote:
>>
>> The deltaiteration calculates the cumulative sum (prefix sum) of
>> residual_2.
>>
>> I'm not sure if got you idea. I could add residual_2 to the workset. But
>> in that case I need to separate the "old_sum"(the sum up to now) and
>> residual_2 each iteration. Actually I had that before, then I tried the
>> Scala closure to clean up the code.
>>
>> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]> wrote:
>>>
>>> Jep, I see you point.
>>> Conceptually, all data that changes and affects the result of an
>>> iteration should be part of the workset.
>>> Hence, the model kind of assumes that the datum "superStepNumber" should
>>> be part of the workset.
>>>
>>> I am not familiar with your application, but would it make sense to add
>>> the number as an additional attribute to the workset data set and increase
>>> it manually?
>>>
>>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber <[hidden email]>:
>>>>
>>>> Ok, sounds true, but somehow I would like to execute it inside of it. So
>>>> I probably need to do some nonsense work to make it part of it?
>>>>
>>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Dammit you beat me to it. But yes, this is exactly what I was just
>>>>> writing.
>>>>>
>>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <[hidden email]>
>>>>> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I'm not super familiar with the iterations, but my guess would be
>>>>> > that the
>>>>> > filter is not evaluated as part of the iteration.
>>>>> > Since it is not connect to the workset, the filter is not part of the
>>>>> > loop
>>>>> > and evaluated once outside where no superset number is available.
>>>>> > I guess, moving the filter outside of the loop gives the same error.
>>>>> >
>>>>> > Cheers, Fabian
>>>>> >
>>>>> >
>>>>> >
>>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>>>> > <[hidden email]>:
>>>>> >>
>>>>> >> Hmm or maybe not. With this code I get some strange error:
>>>>> >>
>>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>>>>> >> val X = env readTextFile config.xFile map
>>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>>>> >> val residual = env readTextFile config.yFile map
>>>>> >> {Vector.parseFromString(_)};
>>>>> >> val randoms = env readTextFile config.randomFile map
>>>>> >> {Vector.parseFromString(_)}
>>>>> >>
>>>>> >> val residual_2 = residual * residual
>>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>>>> >>
>>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>> >> val sumVector =
>>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>>>>> >> Array("id")) {
>>>>> >>     (solutionset, old_sum) =>
>>>>> >>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>>>>> >>       def filter(x: Vector) = x.id ==
>>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>>>> >>     })
>>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>>>> >>
>>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>>>> >>       def map(x: Vector) = new
>>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>>>> >>     }),
>>>>> >>     sum)
>>>>> >> }
>>>>> >>
>>>>> >> Error:
>>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> RUNNING
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> RUNNING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
>>>>> >> java.lang.IllegalStateException: This stub is not part of an
>>>>> >> iteration
>>>>> >> step function.
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >>
>>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to CANCELING
>>>>> >> 10/14/2014 15:57:36: Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S) -
>>>>> >> UTF-8)
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>>>>> >> Iteration))
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>>>> >> (1/1)
>>>>> >> switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>>>> >> switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>>>>> >> Iteration)) (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>>>>> >> Error: The program execution failed:
>>>>> >> java.lang.IllegalStateException: This
>>>>> >> stub is not part of an iteration step function.
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >>
>>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>>>> >> <[hidden email]> wrote:
>>>>> >>>
>>>>> >>> Should work now.
>>>>> >>> Cheers
>>>>> >>>
>>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>>>> >>> <[hidden email]> wrote:
>>>>> >>>>
>>>>> >>>> Ok, thanks.
>>>>> >>>> Please let me know when it is fixed.
>>>>> >>>>
>>>>> >>>> Cheers
>>>>> >>>> Max
>>>>> >>>>
>>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <[hidden email]>
>>>>> >>>> wrote:
>>>>> >>>>>
>>>>> >>>>> Thank you, I will look into that...
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>
>>>>
>>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Aljoscha Krettek
Could you maybe also give some examples for the input expected by your program?

Also, the residual DataSet contains several Vectors while the sum (or
old_sum) DataSet is always only contains 1 Vector. Correct?

On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <[hidden email]> wrote:

> BTW: The current master allows you to not join with the solution set, and
> only use it to accumulate data.
>
> On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
> <[hidden email]> wrote:
>>
>> Ok, that's possible too.
>>
>> VectorDataSet is just scala magic to ease my life (See below). If you want
>> to take a look, I appended the package. The main code is in BumpBoost.scala.
>> In util.scala is the vector stuff.
>> Thanks!
>>
>> class VectorDataSet(X: DataSet[Vector]){
>> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>>
>> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> def sumV() = VectorDataSet.sumV(X)
>> }
>> object VectorDataSet {
>> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> x._1 + x._2}
>> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> x._1 - x._2}
>> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> x._1 * x._2}
>> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> x._1 / x._2}
>>
>> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where "id"
>> equalTo "id"
>> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>>
>> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> VectorDataSet(ds)
>> }
>>
>>
>> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]>
>> wrote:
>>>
>>> Maybe you could use the residual_2 data set as a broadcast dataset.
>>> i.e. make in available in the operation that adds the residual for the
>>> current iteration number to the old_sum. (I'm not sure what the
>>> VectorDataSet.add() method does here). If you gave me the complete
>>> code I could try finding an elegant solution to that problem.
>>>
>>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]> wrote:
>>> > That is an interesting case. Everything that is loop invariant is
>>> > computed
>>> > once outside the loop. You are looking for a way to make this part of
>>> > the
>>> > loop.
>>> >
>>> > Can you try making the filter part of the "VectorDataSet.add(old_sum,
>>> > current)" operation?
>>> >
>>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>>> > <[hidden email]> wrote:
>>> >>
>>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>>> >> residual_2.
>>> >>
>>> >> I'm not sure if got you idea. I could add residual_2 to the workset.
>>> >> But
>>> >> in that case I need to separate the "old_sum"(the sum up to now) and
>>> >> residual_2 each iteration. Actually I had that before, then I tried
>>> >> the
>>> >> Scala closure to clean up the code.
>>> >>
>>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]>
>>> >> wrote:
>>> >>>
>>> >>> Jep, I see you point.
>>> >>> Conceptually, all data that changes and affects the result of an
>>> >>> iteration should be part of the workset.
>>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>>> >>> should
>>> >>> be part of the workset.
>>> >>>
>>> >>> I am not familiar with your application, but would it make sense to
>>> >>> add
>>> >>> the number as an additional attribute to the workset data set and
>>> >>> increase
>>> >>> it manually?
>>> >>>
>>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>>> >>> <[hidden email]>:
>>> >>>>
>>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>>> >>>> it. So
>>> >>>> I probably need to do some nonsense work to make it part of it?
>>> >>>>
>>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>>> >>>> <[hidden email]>
>>> >>>> wrote:
>>> >>>>>
>>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was just
>>> >>>>> writing.
>>> >>>>>
>>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <[hidden email]>
>>> >>>>> wrote:
>>> >>>>> > Hi,
>>> >>>>> >
>>> >>>>> > I'm not super familiar with the iterations, but my guess would be
>>> >>>>> > that the
>>> >>>>> > filter is not evaluated as part of the iteration.
>>> >>>>> > Since it is not connect to the workset, the filter is not part of
>>> >>>>> > the
>>> >>>>> > loop
>>> >>>>> > and evaluated once outside where no superset number is available.
>>> >>>>> > I guess, moving the filter outside of the loop gives the same
>>> >>>>> > error.
>>> >>>>> >
>>> >>>>> > Cheers, Fabian
>>> >>>>> >
>>> >>>>> >
>>> >>>>> >
>>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>> >>>>> > <[hidden email]>:
>>> >>>>> >>
>>> >>>>> >> Hmm or maybe not. With this code I get some strange error:
>>> >>>>> >>
>>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>>> >>>>> >> val X = env readTextFile config.xFile map
>>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>> >>>>> >> val residual = env readTextFile config.yFile map
>>> >>>>> >> {Vector.parseFromString(_)};
>>> >>>>> >> val randoms = env readTextFile config.randomFile map
>>> >>>>> >> {Vector.parseFromString(_)}
>>> >>>>> >>
>>> >>>>> >> val residual_2 = residual * residual
>>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>> >>>>> >>
>>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>> >>>>> >> val sumVector =
>>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>>> >>>>> >> Array("id")) {
>>> >>>>> >>     (solutionset, old_sum) =>
>>> >>>>> >>     val current = residual_2 filter (new
>>> >>>>> >> RichFilterFunction[Vector]{
>>> >>>>> >>       def filter(x: Vector) = x.id ==
>>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>> >>>>> >>     })
>>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>> >>>>> >>
>>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>> >>>>> >>       def map(x: Vector) = new
>>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>> >>>>> >>     }),
>>> >>>>> >>     sum)
>>> >>>>> >> }
>>> >>>>> >>
>>> >>>>> >> Error:
>>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>>> >>>>> >> SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>>> >>>>> >> DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>>> >>>>> >> RUNNING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> RUNNING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>>> >>>>> >> FAILED
>>> >>>>> >> java.lang.IllegalStateException: This stub is not part of an
>>> >>>>> >> iteration
>>> >>>>> >> step function.
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> >>>>> >> at
>>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>> >>>>> >> at
>>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>> >>>>> >>
>>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >>>>> >> (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S)
>>> >>>>> >> -
>>> >>>>> >> UTF-8)
>>> >>>>> >> (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>>> >>>>> >> Iteration))
>>> >>>>> >> (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: Map
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>> >>>>> >> (1/1)
>>> >>>>> >> switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>>> >>>>> >> CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>>> >>>>> >> Error: The program execution failed:
>>> >>>>> >> java.lang.IllegalStateException: This
>>> >>>>> >> stub is not part of an iteration step function.
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> >>>>> >> at
>>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>> >>>>> >> at
>>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>> >>>>> >>
>>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>> >>>>> >> <[hidden email]> wrote:
>>> >>>>> >>>
>>> >>>>> >>> Should work now.
>>> >>>>> >>> Cheers
>>> >>>>> >>>
>>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>> >>>>> >>> <[hidden email]> wrote:
>>> >>>>> >>>>
>>> >>>>> >>>> Ok, thanks.
>>> >>>>> >>>> Please let me know when it is fixed.
>>> >>>>> >>>>
>>> >>>>> >>>> Cheers
>>> >>>>> >>>> Max
>>> >>>>> >>>>
>>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>>> >>>>> >>>> <[hidden email]>
>>> >>>>> >>>> wrote:
>>> >>>>> >>>>>
>>> >>>>> >>>>> Thank you, I will look into that...
>>> >>>>> >>>>
>>> >>>>> >>>>
>>> >>>>> >>>
>>> >>>>> >>
>>> >>>>> >
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Maximilian Alber
@ Stephan: Thanks! So I gonna switch!

Sorry, my bad. I will provide you some sample by tomorrow morning.

Yes. Workaround, because I cannot transfer them into variables, can I by now (or will I ever)?

Maybe some explanation to my solution:
- X is for my a matrix of shape (N, d). Modeled in Flink as dataset of vectors. Each Vector has an ID which is the row number and an array with numbers, the actual row.
- Y is for my a matrix of shape (N, 1) thus actually a column-vector.
- old_sum is either a scalar if d == 1 or a row-vector aka matrix of shape (1, N) or a Dataset with one Vector. (By now I have the convention to give id -1 to them, comes from a former workaround...)

The whole ID story comes from the fact that I need to know which stuff belongs together in mathematical operations (see my zip function). You can look that up in util.scala, that's kind of my math library. I don't want to imagine the mess in Java :)

Cheers
Max



On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <[hidden email]> wrote:
Could you maybe also give some examples for the input expected by your program?

Also, the residual DataSet contains several Vectors while the sum (or
old_sum) DataSet is always only contains 1 Vector. Correct?

On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <[hidden email]> wrote:
> BTW: The current master allows you to not join with the solution set, and
> only use it to accumulate data.
>
> On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
> <[hidden email]> wrote:
>>
>> Ok, that's possible too.
>>
>> VectorDataSet is just scala magic to ease my life (See below). If you want
>> to take a look, I appended the package. The main code is in BumpBoost.scala.
>> In util.scala is the vector stuff.
>> Thanks!
>>
>> class VectorDataSet(X: DataSet[Vector]){
>> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>>
>> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> def sumV() = VectorDataSet.sumV(X)
>> }
>> object VectorDataSet {
>> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> x._1 + x._2}
>> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> x._1 - x._2}
>> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> x._1 * x._2}
>> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> x._1 / x._2}
>>
>> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where "id"
>> equalTo "id"
>> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>>
>> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> VectorDataSet(ds)
>> }
>>
>>
>> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]>
>> wrote:
>>>
>>> Maybe you could use the residual_2 data set as a broadcast dataset.
>>> i.e. make in available in the operation that adds the residual for the
>>> current iteration number to the old_sum. (I'm not sure what the
>>> VectorDataSet.add() method does here). If you gave me the complete
>>> code I could try finding an elegant solution to that problem.
>>>
>>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]> wrote:
>>> > That is an interesting case. Everything that is loop invariant is
>>> > computed
>>> > once outside the loop. You are looking for a way to make this part of
>>> > the
>>> > loop.
>>> >
>>> > Can you try making the filter part of the "VectorDataSet.add(old_sum,
>>> > current)" operation?
>>> >
>>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>>> > <[hidden email]> wrote:
>>> >>
>>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>>> >> residual_2.
>>> >>
>>> >> I'm not sure if got you idea. I could add residual_2 to the workset.
>>> >> But
>>> >> in that case I need to separate the "old_sum"(the sum up to now) and
>>> >> residual_2 each iteration. Actually I had that before, then I tried
>>> >> the
>>> >> Scala closure to clean up the code.
>>> >>
>>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]>
>>> >> wrote:
>>> >>>
>>> >>> Jep, I see you point.
>>> >>> Conceptually, all data that changes and affects the result of an
>>> >>> iteration should be part of the workset.
>>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>>> >>> should
>>> >>> be part of the workset.
>>> >>>
>>> >>> I am not familiar with your application, but would it make sense to
>>> >>> add
>>> >>> the number as an additional attribute to the workset data set and
>>> >>> increase
>>> >>> it manually?
>>> >>>
>>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>>> >>> <[hidden email]>:
>>> >>>>
>>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>>> >>>> it. So
>>> >>>> I probably need to do some nonsense work to make it part of it?
>>> >>>>
>>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>>> >>>> <[hidden email]>
>>> >>>> wrote:
>>> >>>>>
>>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was just
>>> >>>>> writing.
>>> >>>>>
>>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <[hidden email]>
>>> >>>>> wrote:
>>> >>>>> > Hi,
>>> >>>>> >
>>> >>>>> > I'm not super familiar with the iterations, but my guess would be
>>> >>>>> > that the
>>> >>>>> > filter is not evaluated as part of the iteration.
>>> >>>>> > Since it is not connect to the workset, the filter is not part of
>>> >>>>> > the
>>> >>>>> > loop
>>> >>>>> > and evaluated once outside where no superset number is available.
>>> >>>>> > I guess, moving the filter outside of the loop gives the same
>>> >>>>> > error.
>>> >>>>> >
>>> >>>>> > Cheers, Fabian
>>> >>>>> >
>>> >>>>> >
>>> >>>>> >
>>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>> >>>>> > <[hidden email]>:
>>> >>>>> >>
>>> >>>>> >> Hmm or maybe not. With this code I get some strange error:
>>> >>>>> >>
>>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>>> >>>>> >> val X = env readTextFile config.xFile map
>>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>> >>>>> >> val residual = env readTextFile config.yFile map
>>> >>>>> >> {Vector.parseFromString(_)};
>>> >>>>> >> val randoms = env readTextFile config.randomFile map
>>> >>>>> >> {Vector.parseFromString(_)}
>>> >>>>> >>
>>> >>>>> >> val residual_2 = residual * residual
>>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>> >>>>> >>
>>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>> >>>>> >> val sumVector =
>>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>>> >>>>> >> Array("id")) {
>>> >>>>> >>     (solutionset, old_sum) =>
>>> >>>>> >>     val current = residual_2 filter (new
>>> >>>>> >> RichFilterFunction[Vector]{
>>> >>>>> >>       def filter(x: Vector) = x.id ==
>>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>> >>>>> >>     })
>>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>> >>>>> >>
>>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>> >>>>> >>       def map(x: Vector) = new
>>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>> >>>>> >>     }),
>>> >>>>> >>     sum)
>>> >>>>> >> }
>>> >>>>> >>
>>> >>>>> >> Error:
>>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>>> >>>>> >> SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>>> >>>>> >> DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> SCHEDULED
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> DEPLOYING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>>> >>>>> >> RUNNING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> RUNNING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>>> >>>>> >> FAILED
>>> >>>>> >> java.lang.IllegalStateException: This stub is not part of an
>>> >>>>> >> iteration
>>> >>>>> >> step function.
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> >>>>> >> at
>>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>> >>>>> >> at
>>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>> >>>>> >>
>>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >>>>> >> (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S)
>>> >>>>> >> -
>>> >>>>> >> UTF-8)
>>> >>>>> >> (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>>> >>>>> >> Iteration))
>>> >>>>> >> (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> CANCELING
>>> >>>>> >> 10/14/2014 15:57:36: Map
>>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>> >>>>> >> (1/1)
>>> >>>>> >> switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>>> >>>>> >> CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>> >>>>> >> (/tmp/tmpBhOsLd) -
>>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> >>>>> >> switched to
>>> >>>>> >> CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>> >>>>> >> CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>>> >>>>> >> ->
>>> >>>>> >> Map
>>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> >>>>> >> CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>>> >>>>> >> Delta
>>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>>> >>>>> >> Error: The program execution failed:
>>> >>>>> >> java.lang.IllegalStateException: This
>>> >>>>> >> stub is not part of an iteration step function.
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> >>>>> >> at
>>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>> >>>>> >> at
>>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>> >>>>> >> at
>>> >>>>> >>
>>> >>>>> >>
>>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>>> >>>>> >>
>>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>> >>>>> >> <[hidden email]> wrote:
>>> >>>>> >>>
>>> >>>>> >>> Should work now.
>>> >>>>> >>> Cheers
>>> >>>>> >>>
>>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>> >>>>> >>> <[hidden email]> wrote:
>>> >>>>> >>>>
>>> >>>>> >>>> Ok, thanks.
>>> >>>>> >>>> Please let me know when it is fixed.
>>> >>>>> >>>>
>>> >>>>> >>>> Cheers
>>> >>>>> >>>> Max
>>> >>>>> >>>>
>>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>>> >>>>> >>>> <[hidden email]>
>>> >>>>> >>>> wrote:
>>> >>>>> >>>>>
>>> >>>>> >>>>> Thank you, I will look into that...
>>> >>>>> >>>>
>>> >>>>> >>>>
>>> >>>>> >>>
>>> >>>>> >>
>>> >>>>> >
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Aljoscha Krettek
Transferring to variables: Unfortunately not possible right now but we
are working on it.

On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
<[hidden email]> wrote:

> @ Stephan: Thanks! So I gonna switch!
>
> Sorry, my bad. I will provide you some sample by tomorrow morning.
>
> Yes. Workaround, because I cannot transfer them into variables, can I by now
> (or will I ever)?
>
> Maybe some explanation to my solution:
> - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
> vectors. Each Vector has an ID which is the row number and an array with
> numbers, the actual row.
> - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
> - old_sum is either a scalar if d == 1 or a row-vector aka matrix of shape
> (1, N) or a Dataset with one Vector. (By now I have the convention to give
> id -1 to them, comes from a former workaround...)
>
> The whole ID story comes from the fact that I need to know which stuff
> belongs together in mathematical operations (see my zip function). You can
> look that up in util.scala, that's kind of my math library. I don't want to
> imagine the mess in Java :)
>
> Cheers
> Max
>
>
>
> On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> Could you maybe also give some examples for the input expected by your
>> program?
>>
>> Also, the residual DataSet contains several Vectors while the sum (or
>> old_sum) DataSet is always only contains 1 Vector. Correct?
>>
>> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <[hidden email]> wrote:
>> > BTW: The current master allows you to not join with the solution set,
>> > and
>> > only use it to accumulate data.
>> >
>> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>> > <[hidden email]> wrote:
>> >>
>> >> Ok, that's possible too.
>> >>
>> >> VectorDataSet is just scala magic to ease my life (See below). If you
>> >> want
>> >> to take a look, I appended the package. The main code is in
>> >> BumpBoost.scala.
>> >> In util.scala is the vector stuff.
>> >> Thanks!
>> >>
>> >> class VectorDataSet(X: DataSet[Vector]){
>> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>> >>
>> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> >> def sumV() = VectorDataSet.sumV(X)
>> >> }
>> >> object VectorDataSet {
>> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 + x._2}
>> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 - x._2}
>> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 * x._2}
>> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 / x._2}
>> >>
>> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where
>> >> "id"
>> >> equalTo "id"
>> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>> >>
>> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> >> VectorDataSet(ds)
>> >> }
>> >>
>> >>
>> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Maybe you could use the residual_2 data set as a broadcast dataset.
>> >>> i.e. make in available in the operation that adds the residual for the
>> >>> current iteration number to the old_sum. (I'm not sure what the
>> >>> VectorDataSet.add() method does here). If you gave me the complete
>> >>> code I could try finding an elegant solution to that problem.
>> >>>
>> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]>
>> >>> wrote:
>> >>> > That is an interesting case. Everything that is loop invariant is
>> >>> > computed
>> >>> > once outside the loop. You are looking for a way to make this part
>> >>> > of
>> >>> > the
>> >>> > loop.
>> >>> >
>> >>> > Can you try making the filter part of the
>> >>> > "VectorDataSet.add(old_sum,
>> >>> > current)" operation?
>> >>> >
>> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>> >>> > <[hidden email]> wrote:
>> >>> >>
>> >>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>> >>> >> residual_2.
>> >>> >>
>> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>> >>> >> workset.
>> >>> >> But
>> >>> >> in that case I need to separate the "old_sum"(the sum up to now)
>> >>> >> and
>> >>> >> residual_2 each iteration. Actually I had that before, then I tried
>> >>> >> the
>> >>> >> Scala closure to clean up the code.
>> >>> >>
>> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Jep, I see you point.
>> >>> >>> Conceptually, all data that changes and affects the result of an
>> >>> >>> iteration should be part of the workset.
>> >>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>> >>> >>> should
>> >>> >>> be part of the workset.
>> >>> >>>
>> >>> >>> I am not familiar with your application, but would it make sense
>> >>> >>> to
>> >>> >>> add
>> >>> >>> the number as an additional attribute to the workset data set and
>> >>> >>> increase
>> >>> >>> it manually?
>> >>> >>>
>> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>> >>> >>> <[hidden email]>:
>> >>> >>>>
>> >>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>> >>> >>>> it. So
>> >>> >>>> I probably need to do some nonsense work to make it part of it?
>> >>> >>>>
>> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>> >>> >>>> <[hidden email]>
>> >>> >>>> wrote:
>> >>> >>>>>
>> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was
>> >>> >>>>> just
>> >>> >>>>> writing.
>> >>> >>>>>
>> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>> >>> >>>>> <[hidden email]>
>> >>> >>>>> wrote:
>> >>> >>>>> > Hi,
>> >>> >>>>> >
>> >>> >>>>> > I'm not super familiar with the iterations, but my guess would
>> >>> >>>>> > be
>> >>> >>>>> > that the
>> >>> >>>>> > filter is not evaluated as part of the iteration.
>> >>> >>>>> > Since it is not connect to the workset, the filter is not part
>> >>> >>>>> > of
>> >>> >>>>> > the
>> >>> >>>>> > loop
>> >>> >>>>> > and evaluated once outside where no superset number is
>> >>> >>>>> > available.
>> >>> >>>>> > I guess, moving the filter outside of the loop gives the same
>> >>> >>>>> > error.
>> >>> >>>>> >
>> >>> >>>>> > Cheers, Fabian
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>> >>> >>>>> > <[hidden email]>:
>> >>> >>>>> >>
>> >>> >>>>> >> Hmm or maybe not. With this code I get some strange error:
>> >>> >>>>> >>
>> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>> >>> >>>>> >> val X = env readTextFile config.xFile map
>> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>> >>> >>>>> >> val residual = env readTextFile config.yFile map
>> >>> >>>>> >> {Vector.parseFromString(_)};
>> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>> >>> >>>>> >> {Vector.parseFromString(_)}
>> >>> >>>>> >>
>> >>> >>>>> >> val residual_2 = residual * residual
>> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>> >>> >>>>> >>
>> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>> >>>>> >> val sumVector =
>> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>> >>> >>>>> >> Array("id")) {
>> >>> >>>>> >>     (solutionset, old_sum) =>
>> >>> >>>>> >>     val current = residual_2 filter (new
>> >>> >>>>> >> RichFilterFunction[Vector]{
>> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>> >>>>> >>     })
>> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>> >>> >>>>> >>
>> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>> >>> >>>>> >>       def map(x: Vector) = new
>> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>> >>> >>>>> >> x.values)
>> >>> >>>>> >>     }),
>> >>> >>>>> >>     sum)
>> >>> >>>>> >> }
>> >>> >>>>> >>
>> >>> >>>>> >> Error:
>> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> FAILED
>> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part of an
>> >>> >>>>> >> iteration
>> >>> >>>>> >> step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>> >>> >>>>> >> (/tmp/tmplSYJ7S)
>> >>> >>>>> >> -
>> >>> >>>>> >> UTF-8)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>> >>> >>>>> >> Iteration))
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> >>> >>>>> >> Error: The program execution failed:
>> >>> >>>>> >> java.lang.IllegalStateException: This
>> >>> >>>>> >> stub is not part of an iteration step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> >>> >>>>> >> <[hidden email]> wrote:
>> >>> >>>>> >>>
>> >>> >>>>> >>> Should work now.
>> >>> >>>>> >>> Cheers
>> >>> >>>>> >>>
>> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>> >>> >>>>> >>> <[hidden email]> wrote:
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Ok, thanks.
>> >>> >>>>> >>>> Please let me know when it is fixed.
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Cheers
>> >>> >>>>> >>>> Max
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>> >>> >>>>> >>>> <[hidden email]>
>> >>> >>>>> >>>> wrote:
>> >>> >>>>> >>>>>
>> >>> >>>>> >>>>> Thank you, I will look into that...
>> >>> >>>>> >>>>
>> >>> >>>>> >>>>
>> >>> >>>>> >>>
>> >>> >>>>> >>
>> >>> >>>>> >
>> >>> >>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>> >
>> >>
>> >>
>> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Maximilian Alber
Ok.

Here is a input variant:
flink run -v bump_boost-0.1.jar -c bumpboost.Job x_file=X y_file=Y out_file=/tmp/tmpnWYamw random_file=random_file dimensions=1 N=100 width_candidates_file=/tmp/tmpTJ4LDh iterations=30 multi_bump_boost=0 gradient_descent_iterations=30 cache=False min_width=-4 max_width=6 min_width_update=1e-08 max_width_update=10

width_candidates_file is not needed by now. X, Y and random_file are attached.

If you have problems, running it, let me know!
Thanks!

On Tue, Oct 14, 2014 at 10:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Transferring to variables: Unfortunately not possible right now but we
are working on it.

On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
<[hidden email]> wrote:
> @ Stephan: Thanks! So I gonna switch!
>
> Sorry, my bad. I will provide you some sample by tomorrow morning.
>
> Yes. Workaround, because I cannot transfer them into variables, can I by now
> (or will I ever)?
>
> Maybe some explanation to my solution:
> - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
> vectors. Each Vector has an ID which is the row number and an array with
> numbers, the actual row.
> - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
> - old_sum is either a scalar if d == 1 or a row-vector aka matrix of shape
> (1, N) or a Dataset with one Vector. (By now I have the convention to give
> id -1 to them, comes from a former workaround...)
>
> The whole ID story comes from the fact that I need to know which stuff
> belongs together in mathematical operations (see my zip function). You can
> look that up in util.scala, that's kind of my math library. I don't want to
> imagine the mess in Java :)
>
> Cheers
> Max
>
>
>
> On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> Could you maybe also give some examples for the input expected by your
>> program?
>>
>> Also, the residual DataSet contains several Vectors while the sum (or
>> old_sum) DataSet is always only contains 1 Vector. Correct?
>>
>> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <[hidden email]> wrote:
>> > BTW: The current master allows you to not join with the solution set,
>> > and
>> > only use it to accumulate data.
>> >
>> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>> > <[hidden email]> wrote:
>> >>
>> >> Ok, that's possible too.
>> >>
>> >> VectorDataSet is just scala magic to ease my life (See below). If you
>> >> want
>> >> to take a look, I appended the package. The main code is in
>> >> BumpBoost.scala.
>> >> In util.scala is the vector stuff.
>> >> Thanks!
>> >>
>> >> class VectorDataSet(X: DataSet[Vector]){
>> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>> >>
>> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> >> def sumV() = VectorDataSet.sumV(X)
>> >> }
>> >> object VectorDataSet {
>> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 + x._2}
>> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 - x._2}
>> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 * x._2}
>> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 / x._2}
>> >>
>> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where
>> >> "id"
>> >> equalTo "id"
>> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>> >>
>> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> >> VectorDataSet(ds)
>> >> }
>> >>
>> >>
>> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Maybe you could use the residual_2 data set as a broadcast dataset.
>> >>> i.e. make in available in the operation that adds the residual for the
>> >>> current iteration number to the old_sum. (I'm not sure what the
>> >>> VectorDataSet.add() method does here). If you gave me the complete
>> >>> code I could try finding an elegant solution to that problem.
>> >>>
>> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]>
>> >>> wrote:
>> >>> > That is an interesting case. Everything that is loop invariant is
>> >>> > computed
>> >>> > once outside the loop. You are looking for a way to make this part
>> >>> > of
>> >>> > the
>> >>> > loop.
>> >>> >
>> >>> > Can you try making the filter part of the
>> >>> > "VectorDataSet.add(old_sum,
>> >>> > current)" operation?
>> >>> >
>> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>> >>> > <[hidden email]> wrote:
>> >>> >>
>> >>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>> >>> >> residual_2.
>> >>> >>
>> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>> >>> >> workset.
>> >>> >> But
>> >>> >> in that case I need to separate the "old_sum"(the sum up to now)
>> >>> >> and
>> >>> >> residual_2 each iteration. Actually I had that before, then I tried
>> >>> >> the
>> >>> >> Scala closure to clean up the code.
>> >>> >>
>> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Jep, I see you point.
>> >>> >>> Conceptually, all data that changes and affects the result of an
>> >>> >>> iteration should be part of the workset.
>> >>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>> >>> >>> should
>> >>> >>> be part of the workset.
>> >>> >>>
>> >>> >>> I am not familiar with your application, but would it make sense
>> >>> >>> to
>> >>> >>> add
>> >>> >>> the number as an additional attribute to the workset data set and
>> >>> >>> increase
>> >>> >>> it manually?
>> >>> >>>
>> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>> >>> >>> <[hidden email]>:
>> >>> >>>>
>> >>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>> >>> >>>> it. So
>> >>> >>>> I probably need to do some nonsense work to make it part of it?
>> >>> >>>>
>> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>> >>> >>>> <[hidden email]>
>> >>> >>>> wrote:
>> >>> >>>>>
>> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was
>> >>> >>>>> just
>> >>> >>>>> writing.
>> >>> >>>>>
>> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>> >>> >>>>> <[hidden email]>
>> >>> >>>>> wrote:
>> >>> >>>>> > Hi,
>> >>> >>>>> >
>> >>> >>>>> > I'm not super familiar with the iterations, but my guess would
>> >>> >>>>> > be
>> >>> >>>>> > that the
>> >>> >>>>> > filter is not evaluated as part of the iteration.
>> >>> >>>>> > Since it is not connect to the workset, the filter is not part
>> >>> >>>>> > of
>> >>> >>>>> > the
>> >>> >>>>> > loop
>> >>> >>>>> > and evaluated once outside where no superset number is
>> >>> >>>>> > available.
>> >>> >>>>> > I guess, moving the filter outside of the loop gives the same
>> >>> >>>>> > error.
>> >>> >>>>> >
>> >>> >>>>> > Cheers, Fabian
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>> >>> >>>>> > <[hidden email]>:
>> >>> >>>>> >>
>> >>> >>>>> >> Hmm or maybe not. With this code I get some strange error:
>> >>> >>>>> >>
>> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>> >>> >>>>> >> val X = env readTextFile config.xFile map
>> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>> >>> >>>>> >> val residual = env readTextFile config.yFile map
>> >>> >>>>> >> {Vector.parseFromString(_)};
>> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>> >>> >>>>> >> {Vector.parseFromString(_)}
>> >>> >>>>> >>
>> >>> >>>>> >> val residual_2 = residual * residual
>> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>> >>> >>>>> >>
>> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>> >>>>> >> val sumVector =
>> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>> >>> >>>>> >> Array("id")) {
>> >>> >>>>> >>     (solutionset, old_sum) =>
>> >>> >>>>> >>     val current = residual_2 filter (new
>> >>> >>>>> >> RichFilterFunction[Vector]{
>> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>> >>>>> >>     })
>> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>> >>> >>>>> >>
>> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>> >>> >>>>> >>       def map(x: Vector) = new
>> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>> >>> >>>>> >> x.values)
>> >>> >>>>> >>     }),
>> >>> >>>>> >>     sum)
>> >>> >>>>> >> }
>> >>> >>>>> >>
>> >>> >>>>> >> Error:
>> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> FAILED
>> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part of an
>> >>> >>>>> >> iteration
>> >>> >>>>> >> step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>> >>> >>>>> >> (/tmp/tmplSYJ7S)
>> >>> >>>>> >> -
>> >>> >>>>> >> UTF-8)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>> >>> >>>>> >> Iteration))
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> >>> >>>>> >> Error: The program execution failed:
>> >>> >>>>> >> java.lang.IllegalStateException: This
>> >>> >>>>> >> stub is not part of an iteration step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> >>> >>>>> >> <[hidden email]> wrote:
>> >>> >>>>> >>>
>> >>> >>>>> >>> Should work now.
>> >>> >>>>> >>> Cheers
>> >>> >>>>> >>>
>> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>> >>> >>>>> >>> <[hidden email]> wrote:
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Ok, thanks.
>> >>> >>>>> >>>> Please let me know when it is fixed.
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Cheers
>> >>> >>>>> >>>> Max
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>> >>> >>>>> >>>> <[hidden email]>
>> >>> >>>>> >>>> wrote:
>> >>> >>>>> >>>>>
>> >>> >>>>> >>>>> Thank you, I will look into that...
>> >>> >>>>> >>>>
>> >>> >>>>> >>>>
>> >>> >>>>> >>>
>> >>> >>>>> >>
>> >>> >>>>> >
>> >>> >>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>> >
>> >>
>> >>
>> >
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Maximilian Alber
@Aljoscha Sorry, I just tried my workaround. There are some minor conceptual bugs (caused by the id's...). I attached the new version. Unfortunately there the compiler breaks. An issue is already open.

On Wed, Oct 15, 2014 at 11:53 AM, Maximilian Alber <[hidden email]> wrote:
Ok.

Here is a input variant:
flink run -v bump_boost-0.1.jar -c bumpboost.Job x_file=X y_file=Y out_file=/tmp/tmpnWYamw random_file=random_file dimensions=1 N=100 width_candidates_file=/tmp/tmpTJ4LDh iterations=30 multi_bump_boost=0 gradient_descent_iterations=30 cache=False min_width=-4 max_width=6 min_width_update=1e-08 max_width_update=10

width_candidates_file is not needed by now. X, Y and random_file are attached.

If you have problems, running it, let me know!
Thanks!

On Tue, Oct 14, 2014 at 10:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Transferring to variables: Unfortunately not possible right now but we
are working on it.

On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
<[hidden email]> wrote:
> @ Stephan: Thanks! So I gonna switch!
>
> Sorry, my bad. I will provide you some sample by tomorrow morning.
>
> Yes. Workaround, because I cannot transfer them into variables, can I by now
> (or will I ever)?
>
> Maybe some explanation to my solution:
> - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
> vectors. Each Vector has an ID which is the row number and an array with
> numbers, the actual row.
> - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
> - old_sum is either a scalar if d == 1 or a row-vector aka matrix of shape
> (1, N) or a Dataset with one Vector. (By now I have the convention to give
> id -1 to them, comes from a former workaround...)
>
> The whole ID story comes from the fact that I need to know which stuff
> belongs together in mathematical operations (see my zip function). You can
> look that up in util.scala, that's kind of my math library. I don't want to
> imagine the mess in Java :)
>
> Cheers
> Max
>
>
>
> On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> Could you maybe also give some examples for the input expected by your
>> program?
>>
>> Also, the residual DataSet contains several Vectors while the sum (or
>> old_sum) DataSet is always only contains 1 Vector. Correct?
>>
>> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <[hidden email]> wrote:
>> > BTW: The current master allows you to not join with the solution set,
>> > and
>> > only use it to accumulate data.
>> >
>> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>> > <[hidden email]> wrote:
>> >>
>> >> Ok, that's possible too.
>> >>
>> >> VectorDataSet is just scala magic to ease my life (See below). If you
>> >> want
>> >> to take a look, I appended the package. The main code is in
>> >> BumpBoost.scala.
>> >> In util.scala is the vector stuff.
>> >> Thanks!
>> >>
>> >> class VectorDataSet(X: DataSet[Vector]){
>> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>> >>
>> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> >> def sumV() = VectorDataSet.sumV(X)
>> >> }
>> >> object VectorDataSet {
>> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 + x._2}
>> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 - x._2}
>> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 * x._2}
>> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 / x._2}
>> >>
>> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where
>> >> "id"
>> >> equalTo "id"
>> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>> >>
>> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> >> VectorDataSet(ds)
>> >> }
>> >>
>> >>
>> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Maybe you could use the residual_2 data set as a broadcast dataset.
>> >>> i.e. make in available in the operation that adds the residual for the
>> >>> current iteration number to the old_sum. (I'm not sure what the
>> >>> VectorDataSet.add() method does here). If you gave me the complete
>> >>> code I could try finding an elegant solution to that problem.
>> >>>
>> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]>
>> >>> wrote:
>> >>> > That is an interesting case. Everything that is loop invariant is
>> >>> > computed
>> >>> > once outside the loop. You are looking for a way to make this part
>> >>> > of
>> >>> > the
>> >>> > loop.
>> >>> >
>> >>> > Can you try making the filter part of the
>> >>> > "VectorDataSet.add(old_sum,
>> >>> > current)" operation?
>> >>> >
>> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>> >>> > <[hidden email]> wrote:
>> >>> >>
>> >>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>> >>> >> residual_2.
>> >>> >>
>> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>> >>> >> workset.
>> >>> >> But
>> >>> >> in that case I need to separate the "old_sum"(the sum up to now)
>> >>> >> and
>> >>> >> residual_2 each iteration. Actually I had that before, then I tried
>> >>> >> the
>> >>> >> Scala closure to clean up the code.
>> >>> >>
>> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Jep, I see you point.
>> >>> >>> Conceptually, all data that changes and affects the result of an
>> >>> >>> iteration should be part of the workset.
>> >>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>> >>> >>> should
>> >>> >>> be part of the workset.
>> >>> >>>
>> >>> >>> I am not familiar with your application, but would it make sense
>> >>> >>> to
>> >>> >>> add
>> >>> >>> the number as an additional attribute to the workset data set and
>> >>> >>> increase
>> >>> >>> it manually?
>> >>> >>>
>> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>> >>> >>> <[hidden email]>:
>> >>> >>>>
>> >>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>> >>> >>>> it. So
>> >>> >>>> I probably need to do some nonsense work to make it part of it?
>> >>> >>>>
>> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>> >>> >>>> <[hidden email]>
>> >>> >>>> wrote:
>> >>> >>>>>
>> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was
>> >>> >>>>> just
>> >>> >>>>> writing.
>> >>> >>>>>
>> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>> >>> >>>>> <[hidden email]>
>> >>> >>>>> wrote:
>> >>> >>>>> > Hi,
>> >>> >>>>> >
>> >>> >>>>> > I'm not super familiar with the iterations, but my guess would
>> >>> >>>>> > be
>> >>> >>>>> > that the
>> >>> >>>>> > filter is not evaluated as part of the iteration.
>> >>> >>>>> > Since it is not connect to the workset, the filter is not part
>> >>> >>>>> > of
>> >>> >>>>> > the
>> >>> >>>>> > loop
>> >>> >>>>> > and evaluated once outside where no superset number is
>> >>> >>>>> > available.
>> >>> >>>>> > I guess, moving the filter outside of the loop gives the same
>> >>> >>>>> > error.
>> >>> >>>>> >
>> >>> >>>>> > Cheers, Fabian
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>> >>> >>>>> > <[hidden email]>:
>> >>> >>>>> >>
>> >>> >>>>> >> Hmm or maybe not. With this code I get some strange error:
>> >>> >>>>> >>
>> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>> >>> >>>>> >> val X = env readTextFile config.xFile map
>> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>> >>> >>>>> >> val residual = env readTextFile config.yFile map
>> >>> >>>>> >> {Vector.parseFromString(_)};
>> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>> >>> >>>>> >> {Vector.parseFromString(_)}
>> >>> >>>>> >>
>> >>> >>>>> >> val residual_2 = residual * residual
>> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>> >>> >>>>> >>
>> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>> >>>>> >> val sumVector =
>> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>> >>> >>>>> >> Array("id")) {
>> >>> >>>>> >>     (solutionset, old_sum) =>
>> >>> >>>>> >>     val current = residual_2 filter (new
>> >>> >>>>> >> RichFilterFunction[Vector]{
>> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>> >>>>> >>     })
>> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>> >>> >>>>> >>
>> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>> >>> >>>>> >>       def map(x: Vector) = new
>> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>> >>> >>>>> >> x.values)
>> >>> >>>>> >>     }),
>> >>> >>>>> >>     sum)
>> >>> >>>>> >> }
>> >>> >>>>> >>
>> >>> >>>>> >> Error:
>> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> FAILED
>> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part of an
>> >>> >>>>> >> iteration
>> >>> >>>>> >> step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>> >>> >>>>> >> (/tmp/tmplSYJ7S)
>> >>> >>>>> >> -
>> >>> >>>>> >> UTF-8)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>> >>> >>>>> >> Iteration))
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> >>> >>>>> >> Error: The program execution failed:
>> >>> >>>>> >> java.lang.IllegalStateException: This
>> >>> >>>>> >> stub is not part of an iteration step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> >>> >>>>> >> <[hidden email]> wrote:
>> >>> >>>>> >>>
>> >>> >>>>> >>> Should work now.
>> >>> >>>>> >>> Cheers
>> >>> >>>>> >>>
>> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>> >>> >>>>> >>> <[hidden email]> wrote:
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Ok, thanks.
>> >>> >>>>> >>>> Please let me know when it is fixed.
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Cheers
>> >>> >>>>> >>>> Max
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>> >>> >>>>> >>>> <[hidden email]>
>> >>> >>>>> >>>> wrote:
>> >>> >>>>> >>>>>
>> >>> >>>>> >>>>> Thank you, I will look into that...
>> >>> >>>>> >>>>
>> >>> >>>>> >>>>
>> >>> >>>>> >>>
>> >>> >>>>> >>
>> >>> >>>>> >
>> >>> >>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>> >
>> >>
>> >>
>> >
>
>



bb.tar.gz (235K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Stephan Ewen
Hi!

The 0.7.0 release should have fixed that problem. Have you had a chance to try that out?

Greetings,
Stephan


On Wed, Oct 15, 2014 at 3:43 PM, Maximilian Alber <[hidden email]> wrote:
@Aljoscha Sorry, I just tried my workaround. There are some minor conceptual bugs (caused by the id's...). I attached the new version. Unfortunately there the compiler breaks. An issue is already open.

On Wed, Oct 15, 2014 at 11:53 AM, Maximilian Alber <[hidden email]> wrote:
Ok.

Here is a input variant:
flink run -v bump_boost-0.1.jar -c bumpboost.Job x_file=X y_file=Y out_file=/tmp/tmpnWYamw random_file=random_file dimensions=1 N=100 width_candidates_file=/tmp/tmpTJ4LDh iterations=30 multi_bump_boost=0 gradient_descent_iterations=30 cache=False min_width=-4 max_width=6 min_width_update=1e-08 max_width_update=10

width_candidates_file is not needed by now. X, Y and random_file are attached.

If you have problems, running it, let me know!
Thanks!

On Tue, Oct 14, 2014 at 10:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Transferring to variables: Unfortunately not possible right now but we
are working on it.

On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
<[hidden email]> wrote:
> @ Stephan: Thanks! So I gonna switch!
>
> Sorry, my bad. I will provide you some sample by tomorrow morning.
>
> Yes. Workaround, because I cannot transfer them into variables, can I by now
> (or will I ever)?
>
> Maybe some explanation to my solution:
> - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
> vectors. Each Vector has an ID which is the row number and an array with
> numbers, the actual row.
> - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
> - old_sum is either a scalar if d == 1 or a row-vector aka matrix of shape
> (1, N) or a Dataset with one Vector. (By now I have the convention to give
> id -1 to them, comes from a former workaround...)
>
> The whole ID story comes from the fact that I need to know which stuff
> belongs together in mathematical operations (see my zip function). You can
> look that up in util.scala, that's kind of my math library. I don't want to
> imagine the mess in Java :)
>
> Cheers
> Max
>
>
>
> On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> Could you maybe also give some examples for the input expected by your
>> program?
>>
>> Also, the residual DataSet contains several Vectors while the sum (or
>> old_sum) DataSet is always only contains 1 Vector. Correct?
>>
>> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <[hidden email]> wrote:
>> > BTW: The current master allows you to not join with the solution set,
>> > and
>> > only use it to accumulate data.
>> >
>> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>> > <[hidden email]> wrote:
>> >>
>> >> Ok, that's possible too.
>> >>
>> >> VectorDataSet is just scala magic to ease my life (See below). If you
>> >> want
>> >> to take a look, I appended the package. The main code is in
>> >> BumpBoost.scala.
>> >> In util.scala is the vector stuff.
>> >> Thanks!
>> >>
>> >> class VectorDataSet(X: DataSet[Vector]){
>> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>> >>
>> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> >> def sumV() = VectorDataSet.sumV(X)
>> >> }
>> >> object VectorDataSet {
>> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 + x._2}
>> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 - x._2}
>> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 * x._2}
>> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 / x._2}
>> >>
>> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where
>> >> "id"
>> >> equalTo "id"
>> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>> >>
>> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> >> VectorDataSet(ds)
>> >> }
>> >>
>> >>
>> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Maybe you could use the residual_2 data set as a broadcast dataset.
>> >>> i.e. make in available in the operation that adds the residual for the
>> >>> current iteration number to the old_sum. (I'm not sure what the
>> >>> VectorDataSet.add() method does here). If you gave me the complete
>> >>> code I could try finding an elegant solution to that problem.
>> >>>
>> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]>
>> >>> wrote:
>> >>> > That is an interesting case. Everything that is loop invariant is
>> >>> > computed
>> >>> > once outside the loop. You are looking for a way to make this part
>> >>> > of
>> >>> > the
>> >>> > loop.
>> >>> >
>> >>> > Can you try making the filter part of the
>> >>> > "VectorDataSet.add(old_sum,
>> >>> > current)" operation?
>> >>> >
>> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>> >>> > <[hidden email]> wrote:
>> >>> >>
>> >>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>> >>> >> residual_2.
>> >>> >>
>> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>> >>> >> workset.
>> >>> >> But
>> >>> >> in that case I need to separate the "old_sum"(the sum up to now)
>> >>> >> and
>> >>> >> residual_2 each iteration. Actually I had that before, then I tried
>> >>> >> the
>> >>> >> Scala closure to clean up the code.
>> >>> >>
>> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Jep, I see you point.
>> >>> >>> Conceptually, all data that changes and affects the result of an
>> >>> >>> iteration should be part of the workset.
>> >>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>> >>> >>> should
>> >>> >>> be part of the workset.
>> >>> >>>
>> >>> >>> I am not familiar with your application, but would it make sense
>> >>> >>> to
>> >>> >>> add
>> >>> >>> the number as an additional attribute to the workset data set and
>> >>> >>> increase
>> >>> >>> it manually?
>> >>> >>>
>> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>> >>> >>> <[hidden email]>:
>> >>> >>>>
>> >>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>> >>> >>>> it. So
>> >>> >>>> I probably need to do some nonsense work to make it part of it?
>> >>> >>>>
>> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>> >>> >>>> <[hidden email]>
>> >>> >>>> wrote:
>> >>> >>>>>
>> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was
>> >>> >>>>> just
>> >>> >>>>> writing.
>> >>> >>>>>
>> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>> >>> >>>>> <[hidden email]>
>> >>> >>>>> wrote:
>> >>> >>>>> > Hi,
>> >>> >>>>> >
>> >>> >>>>> > I'm not super familiar with the iterations, but my guess would
>> >>> >>>>> > be
>> >>> >>>>> > that the
>> >>> >>>>> > filter is not evaluated as part of the iteration.
>> >>> >>>>> > Since it is not connect to the workset, the filter is not part
>> >>> >>>>> > of
>> >>> >>>>> > the
>> >>> >>>>> > loop
>> >>> >>>>> > and evaluated once outside where no superset number is
>> >>> >>>>> > available.
>> >>> >>>>> > I guess, moving the filter outside of the loop gives the same
>> >>> >>>>> > error.
>> >>> >>>>> >
>> >>> >>>>> > Cheers, Fabian
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>> >>> >>>>> > <[hidden email]>:
>> >>> >>>>> >>
>> >>> >>>>> >> Hmm or maybe not. With this code I get some strange error:
>> >>> >>>>> >>
>> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>> >>> >>>>> >> val X = env readTextFile config.xFile map
>> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>> >>> >>>>> >> val residual = env readTextFile config.yFile map
>> >>> >>>>> >> {Vector.parseFromString(_)};
>> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>> >>> >>>>> >> {Vector.parseFromString(_)}
>> >>> >>>>> >>
>> >>> >>>>> >> val residual_2 = residual * residual
>> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>> >>> >>>>> >>
>> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>> >>>>> >> val sumVector =
>> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>> >>> >>>>> >> Array("id")) {
>> >>> >>>>> >>     (solutionset, old_sum) =>
>> >>> >>>>> >>     val current = residual_2 filter (new
>> >>> >>>>> >> RichFilterFunction[Vector]{
>> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>> >>>>> >>     })
>> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>> >>> >>>>> >>
>> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>> >>> >>>>> >>       def map(x: Vector) = new
>> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>> >>> >>>>> >> x.values)
>> >>> >>>>> >>     }),
>> >>> >>>>> >>     sum)
>> >>> >>>>> >> }
>> >>> >>>>> >>
>> >>> >>>>> >> Error:
>> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> FAILED
>> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part of an
>> >>> >>>>> >> iteration
>> >>> >>>>> >> step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>> >>> >>>>> >> (/tmp/tmplSYJ7S)
>> >>> >>>>> >> -
>> >>> >>>>> >> UTF-8)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>> >>> >>>>> >> Iteration))
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> >>> >>>>> >> Error: The program execution failed:
>> >>> >>>>> >> java.lang.IllegalStateException: This
>> >>> >>>>> >> stub is not part of an iteration step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> >>> >>>>> >> <[hidden email]> wrote:
>> >>> >>>>> >>>
>> >>> >>>>> >>> Should work now.
>> >>> >>>>> >>> Cheers
>> >>> >>>>> >>>
>> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>> >>> >>>>> >>> <[hidden email]> wrote:
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Ok, thanks.
>> >>> >>>>> >>>> Please let me know when it is fixed.
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Cheers
>> >>> >>>>> >>>> Max
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>> >>> >>>>> >>>> <[hidden email]>
>> >>> >>>>> >>>> wrote:
>> >>> >>>>> >>>>>
>> >>> >>>>> >>>>> Thank you, I will look into that...
>> >>> >>>>> >>>>
>> >>> >>>>> >>>>
>> >>> >>>>> >>>
>> >>> >>>>> >>
>> >>> >>>>> >
>> >>> >>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>> >
>> >>
>> >>
>> >
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Maximilian Alber
Hi Stephan!

It worked with the development snapshot. I never tried it with the new release. If you want me to try it out, just say so.

Cheers,
Max

On Mon, Nov 10, 2014 at 5:13 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

The 0.7.0 release should have fixed that problem. Have you had a chance to try that out?

Greetings,
Stephan


On Wed, Oct 15, 2014 at 3:43 PM, Maximilian Alber <[hidden email]> wrote:
@Aljoscha Sorry, I just tried my workaround. There are some minor conceptual bugs (caused by the id's...). I attached the new version. Unfortunately there the compiler breaks. An issue is already open.

On Wed, Oct 15, 2014 at 11:53 AM, Maximilian Alber <[hidden email]> wrote:
Ok.

Here is a input variant:
flink run -v bump_boost-0.1.jar -c bumpboost.Job x_file=X y_file=Y out_file=/tmp/tmpnWYamw random_file=random_file dimensions=1 N=100 width_candidates_file=/tmp/tmpTJ4LDh iterations=30 multi_bump_boost=0 gradient_descent_iterations=30 cache=False min_width=-4 max_width=6 min_width_update=1e-08 max_width_update=10

width_candidates_file is not needed by now. X, Y and random_file are attached.

If you have problems, running it, let me know!
Thanks!

On Tue, Oct 14, 2014 at 10:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Transferring to variables: Unfortunately not possible right now but we
are working on it.

On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
<[hidden email]> wrote:
> @ Stephan: Thanks! So I gonna switch!
>
> Sorry, my bad. I will provide you some sample by tomorrow morning.
>
> Yes. Workaround, because I cannot transfer them into variables, can I by now
> (or will I ever)?
>
> Maybe some explanation to my solution:
> - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
> vectors. Each Vector has an ID which is the row number and an array with
> numbers, the actual row.
> - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
> - old_sum is either a scalar if d == 1 or a row-vector aka matrix of shape
> (1, N) or a Dataset with one Vector. (By now I have the convention to give
> id -1 to them, comes from a former workaround...)
>
> The whole ID story comes from the fact that I need to know which stuff
> belongs together in mathematical operations (see my zip function). You can
> look that up in util.scala, that's kind of my math library. I don't want to
> imagine the mess in Java :)
>
> Cheers
> Max
>
>
>
> On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> Could you maybe also give some examples for the input expected by your
>> program?
>>
>> Also, the residual DataSet contains several Vectors while the sum (or
>> old_sum) DataSet is always only contains 1 Vector. Correct?
>>
>> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <[hidden email]> wrote:
>> > BTW: The current master allows you to not join with the solution set,
>> > and
>> > only use it to accumulate data.
>> >
>> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>> > <[hidden email]> wrote:
>> >>
>> >> Ok, that's possible too.
>> >>
>> >> VectorDataSet is just scala magic to ease my life (See below). If you
>> >> want
>> >> to take a look, I appended the package. The main code is in
>> >> BumpBoost.scala.
>> >> In util.scala is the vector stuff.
>> >> Thanks!
>> >>
>> >> class VectorDataSet(X: DataSet[Vector]){
>> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>> >>
>> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> >> def sumV() = VectorDataSet.sumV(X)
>> >> }
>> >> object VectorDataSet {
>> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 + x._2}
>> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 - x._2}
>> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 * x._2}
>> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 / x._2}
>> >>
>> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where
>> >> "id"
>> >> equalTo "id"
>> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>> >>
>> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> >> VectorDataSet(ds)
>> >> }
>> >>
>> >>
>> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Maybe you could use the residual_2 data set as a broadcast dataset.
>> >>> i.e. make in available in the operation that adds the residual for the
>> >>> current iteration number to the old_sum. (I'm not sure what the
>> >>> VectorDataSet.add() method does here). If you gave me the complete
>> >>> code I could try finding an elegant solution to that problem.
>> >>>
>> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]>
>> >>> wrote:
>> >>> > That is an interesting case. Everything that is loop invariant is
>> >>> > computed
>> >>> > once outside the loop. You are looking for a way to make this part
>> >>> > of
>> >>> > the
>> >>> > loop.
>> >>> >
>> >>> > Can you try making the filter part of the
>> >>> > "VectorDataSet.add(old_sum,
>> >>> > current)" operation?
>> >>> >
>> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>> >>> > <[hidden email]> wrote:
>> >>> >>
>> >>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>> >>> >> residual_2.
>> >>> >>
>> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>> >>> >> workset.
>> >>> >> But
>> >>> >> in that case I need to separate the "old_sum"(the sum up to now)
>> >>> >> and
>> >>> >> residual_2 each iteration. Actually I had that before, then I tried
>> >>> >> the
>> >>> >> Scala closure to clean up the code.
>> >>> >>
>> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Jep, I see you point.
>> >>> >>> Conceptually, all data that changes and affects the result of an
>> >>> >>> iteration should be part of the workset.
>> >>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>> >>> >>> should
>> >>> >>> be part of the workset.
>> >>> >>>
>> >>> >>> I am not familiar with your application, but would it make sense
>> >>> >>> to
>> >>> >>> add
>> >>> >>> the number as an additional attribute to the workset data set and
>> >>> >>> increase
>> >>> >>> it manually?
>> >>> >>>
>> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>> >>> >>> <[hidden email]>:
>> >>> >>>>
>> >>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>> >>> >>>> it. So
>> >>> >>>> I probably need to do some nonsense work to make it part of it?
>> >>> >>>>
>> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>> >>> >>>> <[hidden email]>
>> >>> >>>> wrote:
>> >>> >>>>>
>> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was
>> >>> >>>>> just
>> >>> >>>>> writing.
>> >>> >>>>>
>> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>> >>> >>>>> <[hidden email]>
>> >>> >>>>> wrote:
>> >>> >>>>> > Hi,
>> >>> >>>>> >
>> >>> >>>>> > I'm not super familiar with the iterations, but my guess would
>> >>> >>>>> > be
>> >>> >>>>> > that the
>> >>> >>>>> > filter is not evaluated as part of the iteration.
>> >>> >>>>> > Since it is not connect to the workset, the filter is not part
>> >>> >>>>> > of
>> >>> >>>>> > the
>> >>> >>>>> > loop
>> >>> >>>>> > and evaluated once outside where no superset number is
>> >>> >>>>> > available.
>> >>> >>>>> > I guess, moving the filter outside of the loop gives the same
>> >>> >>>>> > error.
>> >>> >>>>> >
>> >>> >>>>> > Cheers, Fabian
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>> >>> >>>>> > <[hidden email]>:
>> >>> >>>>> >>
>> >>> >>>>> >> Hmm or maybe not. With this code I get some strange error:
>> >>> >>>>> >>
>> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>> >>> >>>>> >> val X = env readTextFile config.xFile map
>> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>> >>> >>>>> >> val residual = env readTextFile config.yFile map
>> >>> >>>>> >> {Vector.parseFromString(_)};
>> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>> >>> >>>>> >> {Vector.parseFromString(_)}
>> >>> >>>>> >>
>> >>> >>>>> >> val residual_2 = residual * residual
>> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>> >>> >>>>> >>
>> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>> >>>>> >> val sumVector =
>> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>> >>> >>>>> >> Array("id")) {
>> >>> >>>>> >>     (solutionset, old_sum) =>
>> >>> >>>>> >>     val current = residual_2 filter (new
>> >>> >>>>> >> RichFilterFunction[Vector]{
>> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>> >>>>> >>     })
>> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>> >>> >>>>> >>
>> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>> >>> >>>>> >>       def map(x: Vector) = new
>> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>> >>> >>>>> >> x.values)
>> >>> >>>>> >>     }),
>> >>> >>>>> >>     sum)
>> >>> >>>>> >> }
>> >>> >>>>> >>
>> >>> >>>>> >> Error:
>> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> FAILED
>> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part of an
>> >>> >>>>> >> iteration
>> >>> >>>>> >> step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>> >>> >>>>> >> (/tmp/tmplSYJ7S)
>> >>> >>>>> >> -
>> >>> >>>>> >> UTF-8)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>> >>> >>>>> >> Iteration))
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> >>> >>>>> >> Error: The program execution failed:
>> >>> >>>>> >> java.lang.IllegalStateException: This
>> >>> >>>>> >> stub is not part of an iteration step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> >>> >>>>> >> <[hidden email]> wrote:
>> >>> >>>>> >>>
>> >>> >>>>> >>> Should work now.
>> >>> >>>>> >>> Cheers
>> >>> >>>>> >>>
>> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>> >>> >>>>> >>> <[hidden email]> wrote:
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Ok, thanks.
>> >>> >>>>> >>>> Please let me know when it is fixed.
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Cheers
>> >>> >>>>> >>>> Max
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>> >>> >>>>> >>>> <[hidden email]>
>> >>> >>>>> >>>> wrote:
>> >>> >>>>> >>>>>
>> >>> >>>>> >>>>> Thank you, I will look into that...
>> >>> >>>>> >>>>
>> >>> >>>>> >>>>
>> >>> >>>>> >>>
>> >>> >>>>> >>
>> >>> >>>>> >
>> >>> >>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>> >
>> >>
>> >>
>> >
>
>




Reply | Threaded
Open this post in threaded view
|

Re: Forced to use Solution Set in Step Function

Stephan Ewen
Good to heat that it worked. No need to try out the other version, the same fix is in there...

On Tue, Nov 11, 2014 at 2:07 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan!

It worked with the development snapshot. I never tried it with the new release. If you want me to try it out, just say so.

Cheers,
Max

On Mon, Nov 10, 2014 at 5:13 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

The 0.7.0 release should have fixed that problem. Have you had a chance to try that out?

Greetings,
Stephan


On Wed, Oct 15, 2014 at 3:43 PM, Maximilian Alber <[hidden email]> wrote:
@Aljoscha Sorry, I just tried my workaround. There are some minor conceptual bugs (caused by the id's...). I attached the new version. Unfortunately there the compiler breaks. An issue is already open.

On Wed, Oct 15, 2014 at 11:53 AM, Maximilian Alber <[hidden email]> wrote:
Ok.

Here is a input variant:
flink run -v bump_boost-0.1.jar -c bumpboost.Job x_file=X y_file=Y out_file=/tmp/tmpnWYamw random_file=random_file dimensions=1 N=100 width_candidates_file=/tmp/tmpTJ4LDh iterations=30 multi_bump_boost=0 gradient_descent_iterations=30 cache=False min_width=-4 max_width=6 min_width_update=1e-08 max_width_update=10

width_candidates_file is not needed by now. X, Y and random_file are attached.

If you have problems, running it, let me know!
Thanks!

On Tue, Oct 14, 2014 at 10:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Transferring to variables: Unfortunately not possible right now but we
are working on it.

On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
<[hidden email]> wrote:
> @ Stephan: Thanks! So I gonna switch!
>
> Sorry, my bad. I will provide you some sample by tomorrow morning.
>
> Yes. Workaround, because I cannot transfer them into variables, can I by now
> (or will I ever)?
>
> Maybe some explanation to my solution:
> - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
> vectors. Each Vector has an ID which is the row number and an array with
> numbers, the actual row.
> - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
> - old_sum is either a scalar if d == 1 or a row-vector aka matrix of shape
> (1, N) or a Dataset with one Vector. (By now I have the convention to give
> id -1 to them, comes from a former workaround...)
>
> The whole ID story comes from the fact that I need to know which stuff
> belongs together in mathematical operations (see my zip function). You can
> look that up in util.scala, that's kind of my math library. I don't want to
> imagine the mess in Java :)
>
> Cheers
> Max
>
>
>
> On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> Could you maybe also give some examples for the input expected by your
>> program?
>>
>> Also, the residual DataSet contains several Vectors while the sum (or
>> old_sum) DataSet is always only contains 1 Vector. Correct?
>>
>> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <[hidden email]> wrote:
>> > BTW: The current master allows you to not join with the solution set,
>> > and
>> > only use it to accumulate data.
>> >
>> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>> > <[hidden email]> wrote:
>> >>
>> >> Ok, that's possible too.
>> >>
>> >> VectorDataSet is just scala magic to ease my life (See below). If you
>> >> want
>> >> to take a look, I appended the package. The main code is in
>> >> BumpBoost.scala.
>> >> In util.scala is the vector stuff.
>> >> Thanks!
>> >>
>> >> class VectorDataSet(X: DataSet[Vector]){
>> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>> >>
>> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> >> def sumV() = VectorDataSet.sumV(X)
>> >> }
>> >> object VectorDataSet {
>> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 + x._2}
>> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 - x._2}
>> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 * x._2}
>> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 / x._2}
>> >>
>> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where
>> >> "id"
>> >> equalTo "id"
>> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>> >>
>> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> >> VectorDataSet(ds)
>> >> }
>> >>
>> >>
>> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Maybe you could use the residual_2 data set as a broadcast dataset.
>> >>> i.e. make in available in the operation that adds the residual for the
>> >>> current iteration number to the old_sum. (I'm not sure what the
>> >>> VectorDataSet.add() method does here). If you gave me the complete
>> >>> code I could try finding an elegant solution to that problem.
>> >>>
>> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <[hidden email]>
>> >>> wrote:
>> >>> > That is an interesting case. Everything that is loop invariant is
>> >>> > computed
>> >>> > once outside the loop. You are looking for a way to make this part
>> >>> > of
>> >>> > the
>> >>> > loop.
>> >>> >
>> >>> > Can you try making the filter part of the
>> >>> > "VectorDataSet.add(old_sum,
>> >>> > current)" operation?
>> >>> >
>> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>> >>> > <[hidden email]> wrote:
>> >>> >>
>> >>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>> >>> >> residual_2.
>> >>> >>
>> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>> >>> >> workset.
>> >>> >> But
>> >>> >> in that case I need to separate the "old_sum"(the sum up to now)
>> >>> >> and
>> >>> >> residual_2 each iteration. Actually I had that before, then I tried
>> >>> >> the
>> >>> >> Scala closure to clean up the code.
>> >>> >>
>> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Jep, I see you point.
>> >>> >>> Conceptually, all data that changes and affects the result of an
>> >>> >>> iteration should be part of the workset.
>> >>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>> >>> >>> should
>> >>> >>> be part of the workset.
>> >>> >>>
>> >>> >>> I am not familiar with your application, but would it make sense
>> >>> >>> to
>> >>> >>> add
>> >>> >>> the number as an additional attribute to the workset data set and
>> >>> >>> increase
>> >>> >>> it manually?
>> >>> >>>
>> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>> >>> >>> <[hidden email]>:
>> >>> >>>>
>> >>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>> >>> >>>> it. So
>> >>> >>>> I probably need to do some nonsense work to make it part of it?
>> >>> >>>>
>> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>> >>> >>>> <[hidden email]>
>> >>> >>>> wrote:
>> >>> >>>>>
>> >>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was
>> >>> >>>>> just
>> >>> >>>>> writing.
>> >>> >>>>>
>> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>> >>> >>>>> <[hidden email]>
>> >>> >>>>> wrote:
>> >>> >>>>> > Hi,
>> >>> >>>>> >
>> >>> >>>>> > I'm not super familiar with the iterations, but my guess would
>> >>> >>>>> > be
>> >>> >>>>> > that the
>> >>> >>>>> > filter is not evaluated as part of the iteration.
>> >>> >>>>> > Since it is not connect to the workset, the filter is not part
>> >>> >>>>> > of
>> >>> >>>>> > the
>> >>> >>>>> > loop
>> >>> >>>>> > and evaluated once outside where no superset number is
>> >>> >>>>> > available.
>> >>> >>>>> > I guess, moving the filter outside of the loop gives the same
>> >>> >>>>> > error.
>> >>> >>>>> >
>> >>> >>>>> > Cheers, Fabian
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>> >>> >>>>> > <[hidden email]>:
>> >>> >>>>> >>
>> >>> >>>>> >> Hmm or maybe not. With this code I get some strange error:
>> >>> >>>>> >>
>> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>> >>> >>>>> >> val X = env readTextFile config.xFile map
>> >>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>> >>> >>>>> >> val residual = env readTextFile config.yFile map
>> >>> >>>>> >> {Vector.parseFromString(_)};
>> >>> >>>>> >> val randoms = env readTextFile config.randomFile map
>> >>> >>>>> >> {Vector.parseFromString(_)}
>> >>> >>>>> >>
>> >>> >>>>> >> val residual_2 = residual * residual
>> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>> >>> >>>>> >>
>> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>> >>>>> >> val sumVector =
>> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>> >>> >>>>> >> Array("id")) {
>> >>> >>>>> >>     (solutionset, old_sum) =>
>> >>> >>>>> >>     val current = residual_2 filter (new
>> >>> >>>>> >> RichFilterFunction[Vector]{
>> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>> >>>>> >>     })
>> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>> >>> >>>>> >>
>> >>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>> >>> >>>>> >>       def map(x: Vector) = new
>> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>> >>> >>>>> >> x.values)
>> >>> >>>>> >>     }),
>> >>> >>>>> >>     sum)
>> >>> >>>>> >> }
>> >>> >>>>> >>
>> >>> >>>>> >> Error:
>> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to
>> >>> >>>>> >> FAILED
>> >>> >>>>> >> java.lang.IllegalStateException: This stub is not part of an
>> >>> >>>>> >> iteration
>> >>> >>>>> >> step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>> >>> >>>>> >> (/tmp/tmplSYJ7S)
>> >>> >>>>> >> -
>> >>> >>>>> >> UTF-8)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>> >>> >>>>> >> Iteration))
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> >>> >>>>> >> Error: The program execution failed:
>> >>> >>>>> >> java.lang.IllegalStateException: This
>> >>> >>>>> >> stub is not part of an iteration step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> >>> >>>>> >> <[hidden email]> wrote:
>> >>> >>>>> >>>
>> >>> >>>>> >>> Should work now.
>> >>> >>>>> >>> Cheers
>> >>> >>>>> >>>
>> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>> >>> >>>>> >>> <[hidden email]> wrote:
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Ok, thanks.
>> >>> >>>>> >>>> Please let me know when it is fixed.
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Cheers
>> >>> >>>>> >>>> Max
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
>> >>> >>>>> >>>> <[hidden email]>
>> >>> >>>>> >>>> wrote:
>> >>> >>>>> >>>>>
>> >>> >>>>> >>>>> Thank you, I will look into that...
>> >>> >>>>> >>>>
>> >>> >>>>> >>>>
>> >>> >>>>> >>>
>> >>> >>>>> >>
>> >>> >>>>> >
>> >>> >>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>> >
>> >>
>> >>
>> >
>
>





12