Union limit

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Union limit

boci
Hi guys!

I have one input (from mongo) and I split the incoming data to multiple datasets (each created dynamically from configuration) and before I write back the result I want to merge it to one dataset (there is some common transformation).
so the flow:

DataSet from Mongod => 
Create Mappers dynamically (currently 74) so I have 74 DataSet => 
Custom filter and mapping on each dataset => 
Union dynamically to one (every mapper result is same type) => 
Some another common transformation =>
Count the result

but when I want to union more than 64 dataset I got these exception:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs.
at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)

I try to split the incoming (74) list of dataset to split to 60 + 14  dataset and create an id mapper and union the result datasets but no success:

val listOfDataSet: List[DataSet[...]] = ....

listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()

There is any solution to solve this?

Thanks
b0c1
Reply | Threaded
Open this post in threaded view
|

Re: Union limit

Fabian Hueske-2
Hi b0c1,

This is an limitation in Flink's optimizer.
Internally, all binary unions are merged into a single n-ary union. The optimizer restricts the number of inputs for an operator to 64.

You can work around this limitation with an identity mapper which prevents the union operators from merging:

in1----\
in2------ Id-Map--- NextOp
...       /             / /
in14--/             / /
                      / /
in15------------/ /
...                   /
in74------------/

This is not a super nice solution, but the only way that comes to my mind.

Cheers, Fabian

2017-08-28 23:29 GMT+02:00 boci <[hidden email]>:
Hi guys!

I have one input (from mongo) and I split the incoming data to multiple datasets (each created dynamically from configuration) and before I write back the result I want to merge it to one dataset (there is some common transformation).
so the flow:

DataSet from Mongod => 
Create Mappers dynamically (currently 74) so I have 74 DataSet => 
Custom filter and mapping on each dataset => 
Union dynamically to one (every mapper result is same type) => 
Some another common transformation =>
Count the result

but when I want to union more than 64 dataset I got these exception:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs.
at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)

I try to split the incoming (74) list of dataset to split to 60 + 14  dataset and create an id mapper and union the result datasets but no success:

val listOfDataSet: List[DataSet[...]] = ....

listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()

There is any solution to solve this?

Thanks
b0c1

Reply | Threaded
Open this post in threaded view
|

Re: Union limit

boci
Dear Fabian,

Thanks to your answer (I think you said same in StackOverflow) but as you see in my code your solution does not work anymore:

Here is the code, it's split the datasets to list (each list contains maximum 60 datasets)
After that, I  reduce the dataset using union and map with an IdMapper and return the id mapped data set.
But when the next reduce (where I want to merge the id mapped stream) the flink said I reached the limit.

Maybe my IdMapper is wrong... Can you show a correct working IdMapper?

b0c1

ps:
Here is the code segment:
listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()



On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <[hidden email]> wrote:
Hi b0c1,

This is an limitation in Flink's optimizer.
Internally, all binary unions are merged into a single n-ary union. The optimizer restricts the number of inputs for an operator to 64.

You can work around this limitation with an identity mapper which prevents the union operators from merging:

in1----\
in2------ Id-Map--- NextOp
...       /             / /
in14--/             / /
                      / /
in15------------/ /
...                   /
in74------------/

This is not a super nice solution, but the only way that comes to my mind.

Cheers, Fabian

2017-08-28 23:29 GMT+02:00 boci <[hidden email]>:
Hi guys!

I have one input (from mongo) and I split the incoming data to multiple datasets (each created dynamically from configuration) and before I write back the result I want to merge it to one dataset (there is some common transformation).
so the flow:

DataSet from Mongod => 
Create Mappers dynamically (currently 74) so I have 74 DataSet => 
Custom filter and mapping on each dataset => 
Union dynamically to one (every mapper result is same type) => 
Some another common transformation =>
Count the result

but when I want to union more than 64 dataset I got these exception:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs.
at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)

I try to split the incoming (74) list of dataset to split to 60 + 14  dataset and create an id mapper and union the result datasets but no success:

val listOfDataSet: List[DataSet[...]] = ....

listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()

There is any solution to solve this?

Thanks
b0c1

Reply | Threaded
Open this post in threaded view
|

Re: Union limit

Fabian Hueske-2
Hi,

the following code should do what you want.
I included an implementation of an IdMapper.
At the end, I print the execution plan which is generated after the optimization (so the pipeline is working until then).

Best, Fabian

val data: Seq[Seq[Int]] = (1 until 315).map(i => Seq(1, 2, 3))

val dataSets: Seq[DataSet[Int]] = data.map(env.fromCollection(_))

dataSets.sliding(60, 60)
  .map(dsg => dsg.reduce( (ds1: DataSet[Int], ds2: DataSet[Int]) => ds1.union(ds2)).map(new IdMapper[Int]()))
  .reduce( (dsg1: DataSet[Int], dsg2: DataSet[Int]) => dsg1.union(dsg2))
  .map(x => x * 2) // do something with the union result
  .output(new DiscardingOutputFormat[Int])

println(env.getExecutionPlan())

class IdMapper[T] extends MapFunction[T, T] {
  override def map(value: T): T = value
}

2017-08-31 12:30 GMT+02:00 boci <[hidden email]>:
Dear Fabian,

Thanks to your answer (I think you said same in StackOverflow) but as you see in my code your solution does not work anymore:

Here is the code, it's split the datasets to list (each list contains maximum 60 datasets)
After that, I  reduce the dataset using union and map with an IdMapper and return the id mapped data set.
But when the next reduce (where I want to merge the id mapped stream) the flink said I reached the limit.

Maybe my IdMapper is wrong... Can you show a correct working IdMapper?

b0c1

ps:
Here is the code segment:
listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()



On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <[hidden email]> wrote:
Hi b0c1,

This is an limitation in Flink's optimizer.
Internally, all binary unions are merged into a single n-ary union. The optimizer restricts the number of inputs for an operator to 64.

You can work around this limitation with an identity mapper which prevents the union operators from merging:

in1----\
in2------ Id-Map--- NextOp
...       /             / /
in14--/             / /
                      / /
in15------------/ /
...                   /
in74------------/

This is not a super nice solution, but the only way that comes to my mind.

Cheers, Fabian

2017-08-28 23:29 GMT+02:00 boci <[hidden email]>:
Hi guys!

I have one input (from mongo) and I split the incoming data to multiple datasets (each created dynamically from configuration) and before I write back the result I want to merge it to one dataset (there is some common transformation).
so the flow:

DataSet from Mongod => 
Create Mappers dynamically (currently 74) so I have 74 DataSet => 
Custom filter and mapping on each dataset => 
Union dynamically to one (every mapper result is same type) => 
Some another common transformation =>
Count the result

but when I want to union more than 64 dataset I got these exception:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs.
at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)

I try to split the incoming (74) list of dataset to split to 60 + 14  dataset and create an id mapper and union the result datasets but no success:

val listOfDataSet: List[DataSet[...]] = ....

listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()

There is any solution to solve this?

Thanks
b0c1