The given strategy does not work on two inputs.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

The given strategy does not work on two inputs.

Maximilian Alber
Hi Flinksters!

The current stable Flink compiler rejects my plan. But I don't have a clue why. The causing line of code is marked:

Code: 

def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
      (solutionset, workset) =>
      val currentWidth = workset filter (new RichFilterFunction[Vector]{
       def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
      })

     val kernelVector = getKernelVector(X, center, currentWidth)

     val x1 = kernelVector dot residual map {x => x*x}
     val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
      def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

    val maxCost = costs max(0)
>>>>>>>>>>>>>>>>>>>>>>>>>
    val width = maxCost join widthCandidates where "id" equalTo "id" map {x => x._2}
>>>>>>>>>>>>>>>>>>>>>>>>>

    //val kernelVector = getKernelVector(X, center, width)

    //val x1 = kernelVector dot residual
    //val x2 = kernelVector dot kernelVector
    //val height = x1 / x2
    //costs
    width
}


The error message is:

java.lang.IllegalArgumentException: The given strategy does not work on two inputs.
at org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164)
at org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172)
at org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235)
at org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585)
at org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546)
at org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497)
at org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)

Thanks!
Cheers,
Max
Reply | Threaded
Open this post in threaded view
|

Re: The given strategy does not work on two inputs.

Stephan Ewen
Let us have a look...

On Thu, Nov 13, 2014 at 12:32 PM, Maximilian Alber <[hidden email]> wrote:
Hi Flinksters!

The current stable Flink compiler rejects my plan. But I don't have a clue why. The causing line of code is marked:

Code: 

def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector], residual: DataSet[Vector], widthCandidates: DataSet[Vector], center: DataSet[Vector]): DataSet[Vector] = {
    val emptyDataSet = env.fromCollection[Vector](Seq())
    val costs = emptyDataSet.iterateDelta(widthCandidates, config.NWidthCandidates, Array("id")) {
      (solutionset, workset) =>
      val currentWidth = workset filter (new RichFilterFunction[Vector]{
       def filter(x: Vector) = x.id == (getIterationRuntimeContext.getSuperstepNumber-1)
      })

     val kernelVector = getKernelVector(X, center, currentWidth)

     val x1 = kernelVector dot residual map {x => x*x}
     val x2 = kernelVector dot kernelVector

     val cost = (x1 / x2) neutralize


     (cost map (new RichMapFunction[Vector, Vector]{
      def map(x: Vector) = new Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
     }),
     workset)
    }

    val maxCost = costs max(0)
>>>>>>>>>>>>>>>>>>>>>>>>>
    val width = maxCost join widthCandidates where "id" equalTo "id" map {x => x._2}
>>>>>>>>>>>>>>>>>>>>>>>>>

    //val kernelVector = getKernelVector(X, center, width)

    //val x1 = kernelVector dot residual
    //val x2 = kernelVector dot kernelVector
    //val height = x1 / x2
    //costs
    width
}


The error message is:

java.lang.IllegalArgumentException: The given strategy does not work on two inputs.
at org.apache.flink.runtime.operators.DriverStrategy.secondDam(DriverStrategy.java:164)
at org.apache.flink.compiler.plan.DualInputPlanNode.hasDamOnPathDownTo(DualInputPlanNode.java:224)
at org.apache.flink.compiler.plan.WorksetIterationPlanNode.hasDamOnPathDownTo(WorksetIterationPlanNode.java:172)
at org.apache.flink.compiler.plan.SingleInputPlanNode.hasDamOnPathDownTo(SingleInputPlanNode.java:235)
at org.apache.flink.compiler.dag.TwoInputNode.placePipelineBreakersIfNecessary(TwoInputNode.java:585)
at org.apache.flink.compiler.dag.TwoInputNode.instantiate(TwoInputNode.java:546)
at org.apache.flink.compiler.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:497)
at org.apache.flink.compiler.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:431)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
at org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)

Thanks!
Cheers,
Max