Hi,
When running CoGroup between the solution set and a different dataset inside a DeltaIteration, the CoGroupFunction only get called for items that exist in the other dataset, simillar to an inner join. This is not the documented behavior for CoGroup: > If a DataSet has a group with no matching key in the other DataSet, > the CoGroupFunction is called with an empty group for the non-existing > group. The following code shows the problem. import org.apache.flink.api.scala._ import org.apache.flink.util.Collector object CoGroupExample { def coGroupFuntion(first: Iterator[(Int, Int)], second: Iterator[(Int, Int)], out: Collector[(Int, Int)]): Unit = { if (second.hasNext) { out.collect(second.next) } else { printf("Not in second set: %s\n", first.next) println("These two lines doesn't appear when " + "running cogroup on solution set") } } def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.getConfig.disableSysoutLogging() val d1 = env.fromElements( new Tuple2(1, 1), new Tuple2(2, 1) , new Tuple2(3, 1) ) d1.iterateDelta(d1, 1, Array{0}) { (solutionSet, workSet) => { val f = workSet.filter(_._1 != 1) println("Cogroup on solution set with delta iteration") val newSolutionSet = solutionSet.coGroup(f) .where(0) .equalTo(0) .apply(coGroupFuntion _) (newSolutionSet, newSolutionSet) } }.print() println("Normal cogroup") val d2 = d1.filter(_._1 != 1) d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print() } } Is this the expected behavior or should I file a bug about this ? Best regards, Kien Truong |
Hi, this is an artifact of how the solution set is internally implemented. Usually, a CoGroup is executed using a sort-merge strategy, i.e., both input are sorted, merged, and handed to the CoGroup function in a streaming fashion. Both inputs are treated equally, and if one of both inputs does not contain a key which is contained in the other input, the CoGroup function is called with an empty iterator.If you have a use case that requires a solution set CoGroup with the missing behavior you should open a JIRA issue. Otherwise it would be great if you could also open a JIRA issue to extend the documentation. 2015-11-16 1:02 GMT+01:00 Truong Duc Kien <[hidden email]>: Hi, |
It is actually very important that the co group in delta iterations works like that. If the CoGroup touched every element in the solution set, the "decreasing work" effect would not happen. The delta iterations are designed for cases where specific updates to the solution are made, driven by the workset. Driving an operator by solution set contents would result in a "bulk iteration" style pattern, so the idea would be to use a proper bulk iteration for those cases. Does that make sense? On Mon, Nov 16, 2015 at 10:54 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi, Best, On Mon,
Nov 16, 2015 at 11:02 PM, Stephan Ewen <[hidden email]>
wrote:
|
Free forum by Nabble | Edit this page |