Login  Register

It is currently not supported to union between dynamic and static path in an iteration.

classic Classic list List threaded Threaded
3 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

It is currently not supported to union between dynamic and static path in an iteration.

Maximilian Alber
Hi flinksters,

I'm really close to the end (at least I hope so), but I still have some issues.
Writing my final loop I got this error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Error: It is currently not supported to union between dynamic and static path in an iteration.
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)

I guess that I should use just the loop step set to create the next step set?

Here is the code:

def createPlanFullIterations(env: ExecutionEnvironment) = {
   val tmp = env readTextFile config.inFile map {Vector.parseFromSVMLightString (config.dimensions, _)}
   val X = tmp map {_._1}
   var residual = tmp map {_._2}
   val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
   val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile != null)
     env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}
   else 
      null

   val emptyDataSet = env.fromCollection[Vector](Seq())
   val model = emptyDataSet.iterate(config.iterations){
     stepSet =>
     val center = calcCenter(env, X, residual, randoms, -1)

     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
     val width = x._1
     val height = x._2

     residual = residual - (getKernelVector(X, center, width).map(new RichMapFunction[Vector, Vector]{
       var height: Vector = null
       override def open(config: Configuration) = {
         height = getRuntimeContext.getBroadcastVariable("height").toList.head
       }
       def map(x: Vector) = {x * height}
       }).withBroadcastSet(height, "height"))

     val centerOut = center map {x => new Vector(0, x.values)}
     val widthOut = width map {x => new Vector(1, x.values)}
     val heightOut = height map {x => new Vector(2, x.values)}
     val stepModel = centerOut union widthOut union height

     stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
     def map(x: Vector) = new Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
     }))
    }

    model map { _ toString } writeAsText config.outFile
}


thanks!
Cheers,
Max
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: It is currently not supported to union between dynamic and static path in an iteration.

Stephan Ewen
Hi!

Yes, that is a known limitation. It is sort of straightforward to resolve, but will take a bit of time. I will try and get to it until the end of the week.

Stephan


On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber <[hidden email]> wrote:
Hi flinksters,

I'm really close to the end (at least I hope so), but I still have some issues.
Writing my final loop I got this error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Error: It is currently not supported to union between dynamic and static path in an iteration.
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)

I guess that I should use just the loop step set to create the next step set?

Here is the code:

def createPlanFullIterations(env: ExecutionEnvironment) = {
   val tmp = env readTextFile config.inFile map {Vector.parseFromSVMLightString (config.dimensions, _)}
   val X = tmp map {_._1}
   var residual = tmp map {_._2}
   val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
   val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile != null)
     env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}
   else 
      null

   val emptyDataSet = env.fromCollection[Vector](Seq())
   val model = emptyDataSet.iterate(config.iterations){
     stepSet =>
     val center = calcCenter(env, X, residual, randoms, -1)

     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
     val width = x._1
     val height = x._2

     residual = residual - (getKernelVector(X, center, width).map(new RichMapFunction[Vector, Vector]{
       var height: Vector = null
       override def open(config: Configuration) = {
         height = getRuntimeContext.getBroadcastVariable("height").toList.head
       }
       def map(x: Vector) = {x * height}
       }).withBroadcastSet(height, "height"))

     val centerOut = center map {x => new Vector(0, x.values)}
     val widthOut = width map {x => new Vector(1, x.values)}
     val heightOut = height map {x => new Vector(2, x.values)}
     val stepModel = centerOut union widthOut union height

     stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
     def map(x: Vector) = new Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
     }))
    }

    model map { _ toString } writeAsText config.outFile
}


thanks!
Cheers,
Max

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: It is currently not supported to union between dynamic and static path in an iteration.

Maximilian Alber
Thanks!

On Wed, Dec 10, 2014 at 4:35 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

Yes, that is a known limitation. It is sort of straightforward to resolve, but will take a bit of time. I will try and get to it until the end of the week.

Stephan


On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber <[hidden email]> wrote:
Hi flinksters,

I'm really close to the end (at least I hope so), but I still have some issues.
Writing my final loop I got this error:

org.apache.flink.compiler.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: An error occurred while translating the optimized plan to a nephele JobGraph: Error: It is currently not supported to union between dynamic and static path in an iteration.
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)

I guess that I should use just the loop step set to create the next step set?

Here is the code:

def createPlanFullIterations(env: ExecutionEnvironment) = {
   val tmp = env readTextFile config.inFile map {Vector.parseFromSVMLightString (config.dimensions, _)}
   val X = tmp map {_._1}
   var residual = tmp map {_._2}
   val randoms = env readTextFile config.randomFile map {Vector.parseFromString(_)}
   val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile != null)
     env readTextFile config.widthCandidatesFile map {Vector.parseFromString(config.dimensions, _)}
   else 
      null

   val emptyDataSet = env.fromCollection[Vector](Seq())
   val model = emptyDataSet.iterate(config.iterations){
     stepSet =>
     val center = calcCenter(env, X, residual, randoms, -1)

     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
     val width = x._1
     val height = x._2

     residual = residual - (getKernelVector(X, center, width).map(new RichMapFunction[Vector, Vector]{
       var height: Vector = null
       override def open(config: Configuration) = {
         height = getRuntimeContext.getBroadcastVariable("height").toList.head
       }
       def map(x: Vector) = {x * height}
       }).withBroadcastSet(height, "height"))

     val centerOut = center map {x => new Vector(0, x.values)}
     val widthOut = width map {x => new Vector(1, x.values)}
     val heightOut = height map {x => new Vector(2, x.values)}
     val stepModel = centerOut union widthOut union height

     stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
     def map(x: Vector) = new Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
     }))
    }

    model map { _ toString } writeAsText config.outFile
}


thanks!
Cheers,
Max