It is currently not supported to union between dynamic and static path in an iteration.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

It is currently not supported to union between dynamic and static path in an iteration.

Maximilian Alber
Hi flinksters,

I'm really close to the end (at least I hope so), but I still have some issues.
Writing my final loop I got this error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Error: It is currently not supported to union between dynamic and static path in an iteration.
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)

I guess that I should use just the loop step set to create the next step set?

Here is the code:

def createPlanFullIterations(env: ExecutionEnvironment) = {
   val tmp = env readTextFile config.inFile map {Vector.parseFromSVMLightString (config.dimensions, _)}
   val X = tmp map {_._1}
   var residual = tmp map {_._2}
   val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
   val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile != null)
     env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}
   else 
      null

   val emptyDataSet = env.fromCollection[Vector](Seq())
   val model = emptyDataSet.iterate(config.iterations){
     stepSet =>
     val center = calcCenter(env, X, residual, randoms, -1)

     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
     val width = x._1
     val height = x._2

     residual = residual - (getKernelVector(X, center, width).map(new RichMapFunction[Vector, Vector]{
       var height: Vector = null
       override def open(config: Configuration) = {
         height = getRuntimeContext.getBroadcastVariable("height").toList.head
       }
       def map(x: Vector) = {x * height}
       }).withBroadcastSet(height, "height"))

     val centerOut = center map {x => new Vector(0, x.values)}
     val widthOut = width map {x => new Vector(1, x.values)}
     val heightOut = height map {x => new Vector(2, x.values)}
     val stepModel = centerOut union widthOut union height

     stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
     def map(x: Vector) = new Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
     }))
    }

    model map { _ toString } writeAsText config.outFile
}


thanks!
Cheers,
Max
Reply | Threaded
Open this post in threaded view
|

Re: It is currently not supported to union between dynamic and static path in an iteration.

Stephan Ewen
Hi!

Yes, that is a known limitation. It is sort of straightforward to resolve, but will take a bit of time. I will try and get to it until the end of the week.

Stephan


On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber <[hidden email]> wrote:
Hi flinksters,

I'm really close to the end (at least I hope so), but I still have some issues.
Writing my final loop I got this error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Error: It is currently not supported to union between dynamic and static path in an iteration.
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)

I guess that I should use just the loop step set to create the next step set?

Here is the code:

def createPlanFullIterations(env: ExecutionEnvironment) = {
   val tmp = env readTextFile config.inFile map {Vector.parseFromSVMLightString (config.dimensions, _)}
   val X = tmp map {_._1}
   var residual = tmp map {_._2}
   val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
   val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile != null)
     env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}
   else 
      null

   val emptyDataSet = env.fromCollection[Vector](Seq())
   val model = emptyDataSet.iterate(config.iterations){
     stepSet =>
     val center = calcCenter(env, X, residual, randoms, -1)

     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
     val width = x._1
     val height = x._2

     residual = residual - (getKernelVector(X, center, width).map(new RichMapFunction[Vector, Vector]{
       var height: Vector = null
       override def open(config: Configuration) = {
         height = getRuntimeContext.getBroadcastVariable("height").toList.head
       }
       def map(x: Vector) = {x * height}
       }).withBroadcastSet(height, "height"))

     val centerOut = center map {x => new Vector(0, x.values)}
     val widthOut = width map {x => new Vector(1, x.values)}
     val heightOut = height map {x => new Vector(2, x.values)}
     val stepModel = centerOut union widthOut union height

     stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
     def map(x: Vector) = new Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
     }))
    }

    model map { _ toString } writeAsText config.outFile
}


thanks!
Cheers,
Max

Reply | Threaded
Open this post in threaded view
|

Re: It is currently not supported to union between dynamic and static path in an iteration.

Maximilian Alber
Thanks!

On Wed, Dec 10, 2014 at 4:35 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

Yes, that is a known limitation. It is sort of straightforward to resolve, but will take a bit of time. I will try and get to it until the end of the week.

Stephan


On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber <[hidden email]> wrote:
Hi flinksters,

I'm really close to the end (at least I hope so), but I still have some issues.
Writing my final loop I got this error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Error: It is currently not supported to union between dynamic and static path in an iteration.
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)

I guess that I should use just the loop step set to create the next step set?

Here is the code:

def createPlanFullIterations(env: ExecutionEnvironment) = {
   val tmp = env readTextFile config.inFile map {Vector.parseFromSVMLightString (config.dimensions, _)}
   val X = tmp map {_._1}
   var residual = tmp map {_._2}
   val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
   val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile != null)
     env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}
   else 
      null

   val emptyDataSet = env.fromCollection[Vector](Seq())
   val model = emptyDataSet.iterate(config.iterations){
     stepSet =>
     val center = calcCenter(env, X, residual, randoms, -1)

     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
     val width = x._1
     val height = x._2

     residual = residual - (getKernelVector(X, center, width).map(new RichMapFunction[Vector, Vector]{
       var height: Vector = null
       override def open(config: Configuration) = {
         height = getRuntimeContext.getBroadcastVariable("height").toList.head
       }
       def map(x: Vector) = {x * height}
       }).withBroadcastSet(height, "height"))

     val centerOut = center map {x => new Vector(0, x.values)}
     val widthOut = width map {x => new Vector(1, x.values)}
     val heightOut = height map {x => new Vector(2, x.values)}
     val stepModel = centerOut union widthOut union height

     stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
     def map(x: Vector) = new Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
     }))
    }

    model map { _ toString } writeAsText config.outFile
}


thanks!
Cheers,
Max