Hi Flinksters! The current stable Flink compiler rejects my plan. But I don't have a clue why. The causing line of code is marked: Code: def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = { val emptyDataSet = env.fromCollection[Vector](Seq()) val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) { (solutionset, workset) => val currentWidth = workset filter (new RichFilterFunction[Vector]{ def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1) }) val kernelVector = getKernelVector(X, center, currentWidth) val x1 = kernelVector dot residual map {x => x*x} val x2 = kernelVector dot kernelVector val cost = (x1 / x2) neutralize (cost map (new RichMapFunction[Vector, Vector]{ def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values) }), workset) } val maxCost = costs max(0) >>>>>>>>>>>>>>>>>>>>>>>>> val width = maxCost join widthCandidates where "id" equalTo "id" map {x => x._2} >>>>>>>>>>>>>>>>>>>>>>>>> //val kernelVector = getKernelVector(X, center, width) //val x1 = kernelVector dot residual //val x2 = kernelVector dot kernelVector //val height = x1 / x2 //costs width } The error message is: java.lang.IllegalArgumentException: The given strategy does not work on two inputs. at org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164) at org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224) at org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172) at org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235) at org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585) at org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546) at org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497) at org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431) at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258) at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258) at org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194) at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561) at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466) at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196) at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209) at org.apache.flink.client.program.Client.run(Client.java:285) at org.apache.flink.client.program.Client.run(Client.java:230) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) Thanks! Cheers, Max |
Let us have a look... On Thu, Nov 13, 2014 at 12:32 PM, Maximilian Alber <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |