Re: Illegal State in Bulk Iteration

Posted by Maximilian Alber on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Illegal-State-in-Bulk-Iteration-tp492p539.html

Sorry! Sure! Please use the settings I posted above, I just saw in other cases I get other strange error messages...

On Mon, Dec 8, 2014 at 8:36 PM, Stephan Ewen <[hidden email]> wrote:
Hey!

Could you share the source code with us? It makes the debugging much much simpler ;-)

Stephan


On Thu, Dec 4, 2014 at 1:40 PM, Maximilian Alber <[hidden email]> wrote:
Could you reproduce the error now?


On Mon, Dec 1, 2014 at 5:12 PM, Maximilian Alber <[hidden email]> wrote:
Sure.
Those are the params (in_file and random_file need to be set, out_file you can choose a name.):

flink 'run', '-v', '/media/alber_disk/alber/Arbeit/Uni/Kurse/MA/repo/python/implementations/../../scripts/../flink/bump_boost/target/bump_boost-0.1.jar', '-c', 'bumpboost.Job', 'in_file=/tmp/tmpxj8LsC', 'out_file=/tmp/tmpuDSlIn', 'random_file=/tmp/tmpp_dFYE', 'dimensions=1', 'N=100', 'iterations=30', 'multi_bump_boost=1', 'gradient_descent_iterations=30', 'cache=False', 'start_width=1.0', 'min_width=-4', 'max_width=6', 'min_width_update=1e-08', 'max_width_update=10'

On Mon, Dec 1, 2014 at 4:47 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I have been trying to re-implement the program you sent here (the code is incomplete), but I cannot trigger the exception. Can you send us the complete example?

Stephan


On Fri, Nov 28, 2014 at 3:18 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

I try to write a BulkIteration. Somehow I get a cryptic error message, at least I have no clue what's wrong:

Code:

var width = env.fromCollection[Vector](Seq(Vector.ones(config.dimensions) * config.startWidth)) map {x => new Vector(0, x.values)}
var update = env.fromCollection[Vector](Seq(Vector.ones(config.dimensions) * 0.01F)) map {x => new Vector(1, x.values)}
var lastGradient = env.fromCollection[Vector](Seq(Vector.zeros(config.dimensions))) map {x => new Vector(2, x.values)}

var stepSet = width union update union lastGradient
stepSet = stepSet.iterate(config.gradientDescentIterations){
    stepSet =>
    var width = stepSet filter {_.id == 0}
    var update = stepSet filter {_.id == 1}
    var lastGradient = stepSet filter {_.id == 2}

    val gradient = getGradient(X, residual, center, width)
    val term = gradient * lastGradient
    lastGradient = gradient

    update = update.map(new RichMapFunction[Vector, Vector]{
      var term: Vector = null
      val minWidthUpdate = 0.00000001F
      val maxWidthUpdate = 10.0F
      override def open(config: Configuration) = {
       term = getRuntimeContext.getBroadcastVariable("term").toList.head
    }

    def map(x: Vector) = {x.condMul(term.isLess(0), 0.5F).condMul(term.isGreater(0), 1.2F).clip(minWidthUpdate, maxWidthUpdate)}
    }).withBroadcastSet(term, "term")
    /*
    width = width.map(new RichMapFunction[Vector, Vector]{
      var update: Vector = null
      var gradient: Vector = null
      override def open(config: Configuration) = {
        update = getRuntimeContext.getBroadcastVariable("update").toList.head
       gradient = getRuntimeContext.getBroadcastVariable("gradient").toList.head
     }

    def map(x: Vector) = {(x + update * (gradient sign)).clip(config.minWidth, config.maxWidth)}
    }).withBroadcastSet(update, "update")withBroadcastSet(gradient, "gradient")
    */

    width union update union lastGradient
}


Error:

java.lang.IllegalStateException
at org.apache.flink.compiler.dag.BulkPartialSolutionNode.setCandidateProperties(BulkPartialSolutionNode.java:50)
at org.apache.flink.compiler.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:292)
at org.apache.flink.compiler.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:367)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:315)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.BinaryUnionNode.getAlternativePlans(BinaryUnionNode.java:105)
at org.apache.flink.compiler.dag.BinaryUnionNode.getAlternativePlans(BinaryUnionNode.java:104)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:197)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:210)
at org.apache.flink.client.program.Client.run(Client.java:288)
at org.apache.flink.client.program.Client.run(Client.java:231)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)

Thanks!
Cheers,
Max






bump_boost.tar.gz (674K) Download Attachment