Hi Flinksters!I try to write a BulkIteration. Somehow I get a cryptic error message, at least I have no clue what's wrong:Code:var width = env.fromCollection[Vector](Seq(Vector.ones(config.dimensions) * config.startWidth)) map {x => new Vector(0, x.values)}
var update = env.fromCollection[Vector](Seq(Vector.ones(config.dimensions) * 0.01F)) map {x => new Vector(1, x.values)}
var lastGradient = env.fromCollection[Vector](Seq(Vector.zeros(config.dimensions))) map {x => new Vector(2, x.values)}
var stepSet = width union update union lastGradient
stepSet = stepSet.iterate(config.gradientDescentIterations){
stepSet =>
var width = stepSet filter {_.id == 0}
var update = stepSet filter {_.id == 1}
var lastGradient = stepSet filter {_.id == 2}
val gradient = getGradient(X, residual, center, width)
val term = gradient * lastGradient
lastGradient = gradient
update = update.map(new RichMapFunction[Vector, Vector]{
var term: Vector = null
val minWidthUpdate = 0.00000001F
val maxWidthUpdate = 10.0F
override def open(config: Configuration) = {
term = getRuntimeContext.getBroadcastVariable("term").toList.head
}
def map(x: Vector) = {x.condMul(term.isLess(0), 0.5F).condMul(term.isGreater(0), 1.2F).clip(minWidthUpdate, maxWidthUpdate)}
}).withBroadcastSet(term, "term")
/*
width = width.map(new RichMapFunction[Vector, Vector]{
var update: Vector = null
var gradient: Vector = null
override def open(config: Configuration) = {
update = getRuntimeContext.getBroadcastVariable("update").toList.head
gradient = getRuntimeContext.getBroadcastVariable("gradient").toList.head
}
def map(x: Vector) = {(x + update * (gradient sign)).clip(config.minWidth, config.maxWidth)}
}).withBroadcastSet(update, "update")withBroadcastSet(gradient, "gradient")
*/
width union update union lastGradient
}Error:java.lang.IllegalStateException
at org.apache.flink.compiler.dag.BulkPartialSolutionNode.setCandidateProperties(BulkPartialSolutionNode.java:50)
at org.apache.flink.compiler.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:292)
at org.apache.flink.compiler.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:367)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:315)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.BinaryUnionNode.getAlternativePlans(BinaryUnionNode.java:105)
at org.apache.flink.compiler.dag.BinaryUnionNode.getAlternativePlans(BinaryUnionNode.java:104)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:197)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:210)
at org.apache.flink.client.program.Client.run(Client.java:288)
at org.apache.flink.client.program.Client.run(Client.java:231)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)Thanks!Cheers,Max
Free forum by Nabble | Edit this page |