No Nested Iterations??? And where is the Nested Iteration?

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

No Nested Iterations??? And where is the Nested Iteration?

Maximilian Alber
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}
Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Stephan Ewen
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}

Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Maximilian Alber
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}


Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Stephan Ewen
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}



Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Stephan Ewen
I found the cause of the bug and have opened a JIRA to track it.


You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}




Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Maximilian Alber
Ok, thanks.

But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add this feature?
Because I need nested iterations for my algorithm, so it would be nice to know when I can expect them.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <[hidden email]> wrote:
I found the cause of the bug and have opened a JIRA to track it.


You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}





Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Maximilian Alber
Oh sorry, I just read the bug title. So my questions is when you are planning to add nested iterations?

Cheers,
Max

On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <[hidden email]> wrote:
Ok, thanks.

But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add this feature?
Because I need nested iterations for my algorithm, so it would be nice to know when I can expect them.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <[hidden email]> wrote:
I found the cause of the bug and have opened a JIRA to track it.


You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}






Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Stephan Ewen
We are not planning to add closed-loop nested iterations in the near future. That is a bit of an effort and so far, and I think no one can pick that up very soon.

We will be supporting roll-out iterations (for loop style) much more efficiently soon. There is no reason why you could not nest two for-loops. However, those are only bulk-style, not delta-iteration style.

If you would like to contribute iteration nesting, I could help you to get started.

Greetings,
Stephan


On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <[hidden email]> wrote:
Oh sorry, I just read the bug title. So my questions is when you are planning to add nested iterations?

Cheers,
Max

On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <[hidden email]> wrote:
Ok, thanks.

But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add this feature?
Because I need nested iterations for my algorithm, so it would be nice to know when I can expect them.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <[hidden email]> wrote:
I found the cause of the bug and have opened a JIRA to track it.


You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}







Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Maximilian Alber
Ok. With for loop style you intend a loop with a fixed range?
In my case I would have a delta-iteration inside a bulk-iteration. I guess wouldn't be "roll-out-able"?

Btw is there any intention to allow bulk-style iterations on several datasets "concurrently"?

Maybe we could discuss my problem next week at the meetup?

Thank you for the offer, but I'm in the middle of thesis, thus I don't have time for it.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <[hidden email]> wrote:
We are not planning to add closed-loop nested iterations in the near future. That is a bit of an effort and so far, and I think no one can pick that up very soon.

We will be supporting roll-out iterations (for loop style) much more efficiently soon. There is no reason why you could not nest two for-loops. However, those are only bulk-style, not delta-iteration style.

If you would like to contribute iteration nesting, I could help you to get started.

Greetings,
Stephan


On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <[hidden email]> wrote:
Oh sorry, I just read the bug title. So my questions is when you are planning to add nested iterations?

Cheers,
Max

On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <[hidden email]> wrote:
Ok, thanks.

But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add this feature?
Because I need nested iterations for my algorithm, so it would be nice to know when I can expect them.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <[hidden email]> wrote:
I found the cause of the bug and have opened a JIRA to track it.


You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}








Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Stephan Ewen
Talking at the meetup sounds good!

On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <[hidden email]> wrote:
Ok. With for loop style you intend a loop with a fixed range?
In my case I would have a delta-iteration inside a bulk-iteration. I guess wouldn't be "roll-out-able"?

Btw is there any intention to allow bulk-style iterations on several datasets "concurrently"?

Maybe we could discuss my problem next week at the meetup?

Thank you for the offer, but I'm in the middle of thesis, thus I don't have time for it.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <[hidden email]> wrote:
We are not planning to add closed-loop nested iterations in the near future. That is a bit of an effort and so far, and I think no one can pick that up very soon.

We will be supporting roll-out iterations (for loop style) much more efficiently soon. There is no reason why you could not nest two for-loops. However, those are only bulk-style, not delta-iteration style.

If you would like to contribute iteration nesting, I could help you to get started.

Greetings,
Stephan


On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <[hidden email]> wrote:
Oh sorry, I just read the bug title. So my questions is when you are planning to add nested iterations?

Cheers,
Max

On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <[hidden email]> wrote:
Ok, thanks.

But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add this feature?
Because I need nested iterations for my algorithm, so it would be nice to know when I can expect them.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <[hidden email]> wrote:
I found the cause of the bug and have opened a JIRA to track it.


You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}









Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Stephan Ewen
The current master contains a fix for the incorrectly identified nested iteration bug.

Please let us know if it fixes your problem!

Greetings,
Stephan


On Wed, Nov 12, 2014 at 5:19 PM, Stephan Ewen <[hidden email]> wrote:
Talking at the meetup sounds good!

On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <[hidden email]> wrote:
Ok. With for loop style you intend a loop with a fixed range?
In my case I would have a delta-iteration inside a bulk-iteration. I guess wouldn't be "roll-out-able"?

Btw is there any intention to allow bulk-style iterations on several datasets "concurrently"?

Maybe we could discuss my problem next week at the meetup?

Thank you for the offer, but I'm in the middle of thesis, thus I don't have time for it.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <[hidden email]> wrote:
We are not planning to add closed-loop nested iterations in the near future. That is a bit of an effort and so far, and I think no one can pick that up very soon.

We will be supporting roll-out iterations (for loop style) much more efficiently soon. There is no reason why you could not nest two for-loops. However, those are only bulk-style, not delta-iteration style.

If you would like to contribute iteration nesting, I could help you to get started.

Greetings,
Stephan


On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <[hidden email]> wrote:
Oh sorry, I just read the bug title. So my questions is when you are planning to add nested iterations?

Cheers,
Max

On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <[hidden email]> wrote:
Ok, thanks.

But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add this feature?
Because I need nested iterations for my algorithm, so it would be nice to know when I can expect them.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <[hidden email]> wrote:
I found the cause of the bug and have opened a JIRA to track it.


You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}










Reply | Threaded
Open this post in threaded view
|

Re: No Nested Iterations??? And where is the Nested Iteration?

Maximilian Alber
The error is gone.
Thanks!

Cheers,
Max

On Wed, Nov 12, 2014 at 11:41 PM, Stephan Ewen <[hidden email]> wrote:
The current master contains a fix for the incorrectly identified nested iteration bug.

Please let us know if it fixes your problem!

Greetings,
Stephan


On Wed, Nov 12, 2014 at 5:19 PM, Stephan Ewen <[hidden email]> wrote:
Talking at the meetup sounds good!

On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <[hidden email]> wrote:
Ok. With for loop style you intend a loop with a fixed range?
In my case I would have a delta-iteration inside a bulk-iteration. I guess wouldn't be "roll-out-able"?

Btw is there any intention to allow bulk-style iterations on several datasets "concurrently"?

Maybe we could discuss my problem next week at the meetup?

Thank you for the offer, but I'm in the middle of thesis, thus I don't have time for it.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <[hidden email]> wrote:
We are not planning to add closed-loop nested iterations in the near future. That is a bit of an effort and so far, and I think no one can pick that up very soon.

We will be supporting roll-out iterations (for loop style) much more efficiently soon. There is no reason why you could not nest two for-loops. However, those are only bulk-style, not delta-iteration style.

If you would like to contribute iteration nesting, I could help you to get started.

Greetings,
Stephan


On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <[hidden email]> wrote:
Oh sorry, I just read the bug title. So my questions is when you are planning to add nested iterations?

Cheers,
Max

On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <[hidden email]> wrote:
Ok, thanks.

But the bug causes that it Flink "sees" a nested iteration where none is?
Or is it a bug that nested are not supported? If not when you plan to add this feature?
Because I need nested iterations for my algorithm, so it would be nice to know when I can expect them.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <[hidden email]> wrote:
I found the cause of the bug and have opened a JIRA to track it.


You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I am looking into it right now...

Stephan


On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <[hidden email]> wrote:
Hi Stephan,

you already had time to investigate this issue?

Cheers,
Max

On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

First some good news: the cumsum code from the last issue works now correctly and is tested.

Bad news (at least for me): I just run into this (for the error and code see below). You have a road map when this feature will be available? Regardless of the rest, I would need it in the near future.

So far so good. But I wonder where this nested iteration should be. At least I do not see them... I have an iteration and inside a lot of filters/maps/etc. but not another iteration.

Cheers,
Max

Error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
at org.apache.flink.client.program.Client.run(Client.java:290)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
... 14 more
Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations are not possible at the moment!
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
... 33 more

Code:

def createPlanFirstIteration(env: ExecutionEnvironment) = {
    val X = env readTextFile config.xFile map {Vector.parseFromString(config.dimensions, _)}
    val residual = env readTextFile config.yFile map {Vector.parseFromString(_)}
    val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
    val widthCandidates = env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}

    val center = calcCenter(env, X, residual, randoms, 0)

    val x = calcWidthHeight(env, X, residual, widthCandidates, center)

    x map { _ toString } writeAsText config.outFile
}

def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] = {
    val residual_2 = residual * residual
    val ys = (residual_2 sumV) * (randoms filter {_.id == iteration} neutralize)

    val emptyDataSet = env.fromCollection[Vector](Seq())
    val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
    val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2, config.N+1, Array("id")) {
      (solutionset, workset) =>
      val current = workset filter (new RichFilterFunction[Vector]{
        def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })
      val old_sum = workset filter {_.id == -1}
      val sum = VectorDataSet.add(old_sum, current.neutralize())

      val new_workset = workset filter {_.id != -1} union sum
       (sum map (new RichMapFunction[Vector, Vector]{
          def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
       }),
      new_workset)
     }
   val index = cumSum.filter(new RichFilterFunction[Vector](){
      var y: Vector = null
     override def open(config: Configuration) = {
        y = getRuntimeContext.getBroadcastVariable("ys").toList.head
     }
     def filter(x: Vector) = x.values(0) < y.values(0)
   }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0

    val center = X.filter(new RichFilterFunction[Vector](){
    var index: Int = -1
    override def open(config: Configuration) = {
      val x: Tuple1[Int] = getRuntimeContext.getBroadcastVariable("index").toList.head
      index = x._1
       }
      def filter(x: Vector) = x.id == index
    }).withBroadcastSet(index, "index")

    center neutralize
}

def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width: DataSet[Vector]): DataSet[Vector] = {
    X.map(new RichMapFunction[Vector, Vector]{
      var center: Vector = null
      var width: Vector = null
      override def open(config: Configuration) = {
       center = getRuntimeContext.getBroadcastVariable("center").toList.head
       width = getRuntimeContext.getBroadcastVariable("width").toList.head
     }

    def map(x: Vector) = new Vector(x.id, Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
    }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
}


def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
       (solutionset, workset) =>
       val currentWidth = workset filter (new RichFilterFunction[Vector]{
         def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
       })

      val kernelVector = getKernelVector(X, center, currentWidth)

      val x1 = kernelVector dot residual map {x => x*x}
      val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
       def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

// todo: will not work
//val width = costs max(0)

//val kernelVector = getKernelVector(X, center, width)

//val x1 = kernelVector dot residual
//val x2 = kernelVector dot kernelVector
//val height = x1 / x2
    costs
}