Hi guys!
I have one input (from mongo) and I split the incoming data to multiple datasets (each created dynamically from configuration) and before I write back the result I want to merge it to one dataset (there is some common transformation). so the flow: DataSet from Mongod => Create Mappers dynamically (currently 74) so I have 74 DataSet => Custom filter and mapping on each dataset => Union dynamically to one (every mapper result is same type) => Some another common transformation => Count the result but when I want to union more than 64 dataset I got these exception: Exception in thread "main" org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs. at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348) at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202) at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268) at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82) I try to split the incoming (74) list of dataset to split to 60 + 14 dataset and create an id mapper and union the result datasets but no success: val listOfDataSet: List[DataSet[...]] = .... listOfDataSet .sliding(60,60) .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper())) //There is an iterator of DataSet .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception .map(finalDataSet => ... some transformation ...) .count() There is any solution to solve this? Thanks b0c1 |
Hi b0c1, This is an limitation in Flink's optimizer.... / / / / / in15------------/ / ... / in74------------/ This is not a super nice solution, but the only way that comes to my mind. Cheers, Fabian 2017-08-28 23:29 GMT+02:00 boci <[hidden email]>:
|
Dear Fabian, Thanks to your answer (I think you said same in StackOverflow) but as you see in my code your solution does not work anymore: Here is the code, it's split the datasets to list (each list contains maximum 60 datasets) After that, I reduce the dataset using union and map with an IdMapper and return the id mapped data set. But when the next reduce (where I want to merge the id mapped stream) the flink said I reached the limit. Maybe my IdMapper is wrong... Can you show a correct working IdMapper? b0c1 ps: Here is the code segment: listOfDataSet .sliding(60,60) .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper())) //There is an iterator of DataSet .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception .map(finalDataSet => ... some transformation ...) .count() On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <[hidden email]> wrote:
|
Hi, the following code should do what you want. I included an implementation of an IdMapper. At the end, I print the execution plan which is generated after the optimization (so the pipeline is working until then). Best, Fabian val data: Seq[Seq[Int]] = (1 until 315).map(i => Seq(1, 2, 3)) val dataSets: Seq[DataSet[Int]] = data.map(env.fromCollection(_)) dataSets.sliding(60, 60) .map(dsg => dsg.reduce( (ds1: DataSet[Int], ds2: DataSet[Int]) => ds1.union(ds2)).map(new IdMapper[Int]())) .reduce( (dsg1: DataSet[Int], dsg2: DataSet[Int]) => dsg1.union(dsg2)) .map(x => x * 2) // do something with the union result .output(new DiscardingOutputFormat[Int]) println(env.getExecutionPlan()) class IdMapper[T] extends MapFunction[T, T] { override def map(value: T): T = value } 2017-08-31 12:30 GMT+02:00 boci <[hidden email]>:
|
Free forum by Nabble | Edit this page |