Different CoGroup behavior inside DeltaIteration

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Different CoGroup behavior inside DeltaIteration

Kien Truong
Hi,

When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:

> If a DataSet has a group with no matching key in the other DataSet,
> the CoGroupFunction is called with an empty group for the non-existing
> group.

The following code shows the problem.

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object CoGroupExample {

   def coGroupFuntion(first: Iterator[(Int, Int)],
                      second: Iterator[(Int, Int)],
                      out: Collector[(Int, Int)]): Unit = {
     if (second.hasNext) {
       out.collect(second.next)
     } else {
       printf("Not in second set: %s\n", first.next)
       println("These two lines doesn't appear when " +
         "running cogroup on solution set")
     }
   }

   def main(args: Array[String]): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     env.getConfig.disableSysoutLogging()

     val d1 = env.fromElements(
       new Tuple2(1, 1),
       new Tuple2(2, 1) ,
       new Tuple2(3, 1)
     )

     d1.iterateDelta(d1, 1, Array{0}) {
       (solutionSet, workSet) => {
         val f = workSet.filter(_._1 != 1)
         println("Cogroup on solution set with delta iteration")
         val newSolutionSet = solutionSet.coGroup(f)
           .where(0)
           .equalTo(0)
           .apply(coGroupFuntion _)
         (newSolutionSet, newSolutionSet)
       }
     }.print()

     println("Normal cogroup")
     val d2 = d1.filter(_._1 != 1)
     d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
   }
}



Is this the expected behavior or should I file a bug about this ?

Best regards,
Kien Truong
Reply | Threaded
Open this post in threaded view
|

Re: Different CoGroup behavior inside DeltaIteration

Fabian Hueske-2
Hi,

this is an artifact of how the solution set is internally implemented. Usually, a CoGroup is executed using a sort-merge strategy, i.e., both input are sorted, merged, and handed to the CoGroup function in a streaming fashion. Both inputs are treated equally, and if one of both inputs does not contain a key which is contained in the other input, the CoGroup function is called with an empty iterator.

The solution set of a delta iteration is stored in a hash table (with only one entry per key). When a solution set is coGrouped with another data set, the other input is sorted and probed against the hash table. The solution set iterator of the CoGroup function will contain one element if the hash table contains an element and be empty if the hash table doesn't contain an entry for the key. However, the hash table will not check that all elements of the hash table have been looked-up in order to identify elements of the solution set for which no corresponding element was present in the other data set.

So, the CoGroup with a solution set works only in one direction as stated in the documentation. This is kind of intended by the way the solution set CoGroup is implemented, but we should definitely updated the documentation to cover this case!

If you have a use case that requires a solution set CoGroup with the missing behavior you should open a JIRA issue.
Otherwise it would be great if you could also open a JIRA issue to extend the documentation.

Thank you, Fabian

2015-11-16 1:02 GMT+01:00 Truong Duc Kien <[hidden email]>:
Hi,

When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:

If a DataSet has a group with no matching key in the other DataSet,
the CoGroupFunction is called with an empty group for the non-existing
group.

The following code shows the problem.

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object CoGroupExample {

  def coGroupFuntion(first: Iterator[(Int, Int)],
                     second: Iterator[(Int, Int)],
                     out: Collector[(Int, Int)]): Unit = {
    if (second.hasNext) {
      out.collect(second.next)
    } else {
      printf("Not in second set: %s\n", first.next)
      println("These two lines doesn't appear when " +
        "running cogroup on solution set")
    }
  }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.disableSysoutLogging()

    val d1 = env.fromElements(
      new Tuple2(1, 1),
      new Tuple2(2, 1) ,
      new Tuple2(3, 1)
    )

    d1.iterateDelta(d1, 1, Array{0}) {
      (solutionSet, workSet) => {
        val f = workSet.filter(_._1 != 1)
        println("Cogroup on solution set with delta iteration")
        val newSolutionSet = solutionSet.coGroup(f)
          .where(0)
          .equalTo(0)
          .apply(coGroupFuntion _)
        (newSolutionSet, newSolutionSet)
      }
    }.print()

    println("Normal cogroup")
    val d2 = d1.filter(_._1 != 1)
    d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
  }
}



Is this the expected behavior or should I file a bug about this ?

Best regards,
Kien Truong

Reply | Threaded
Open this post in threaded view
|

Re: Different CoGroup behavior inside DeltaIteration

Stephan Ewen
It is actually very important that the co group in delta iterations works like that.
If the CoGroup touched every element in the solution set, the "decreasing work" effect would not happen.

The delta iterations are designed for cases where specific updates to the solution are made, driven by the workset.
Driving an operator by solution set contents would result in a "bulk iteration" style pattern, so the idea would be to use a proper bulk iteration for those cases.

Does that make sense?



On Mon, Nov 16, 2015 at 10:54 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

this is an artifact of how the solution set is internally implemented. Usually, a CoGroup is executed using a sort-merge strategy, i.e., both input are sorted, merged, and handed to the CoGroup function in a streaming fashion. Both inputs are treated equally, and if one of both inputs does not contain a key which is contained in the other input, the CoGroup function is called with an empty iterator.

The solution set of a delta iteration is stored in a hash table (with only one entry per key). When a solution set is coGrouped with another data set, the other input is sorted and probed against the hash table. The solution set iterator of the CoGroup function will contain one element if the hash table contains an element and be empty if the hash table doesn't contain an entry for the key. However, the hash table will not check that all elements of the hash table have been looked-up in order to identify elements of the solution set for which no corresponding element was present in the other data set.

So, the CoGroup with a solution set works only in one direction as stated in the documentation. This is kind of intended by the way the solution set CoGroup is implemented, but we should definitely updated the documentation to cover this case!

If you have a use case that requires a solution set CoGroup with the missing behavior you should open a JIRA issue.
Otherwise it would be great if you could also open a JIRA issue to extend the documentation.

Thank you, Fabian

2015-11-16 1:02 GMT+01:00 Truong Duc Kien <[hidden email]>:
Hi,

When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:

If a DataSet has a group with no matching key in the other DataSet,
the CoGroupFunction is called with an empty group for the non-existing
group.

The following code shows the problem.

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object CoGroupExample {

  def coGroupFuntion(first: Iterator[(Int, Int)],
                     second: Iterator[(Int, Int)],
                     out: Collector[(Int, Int)]): Unit = {
    if (second.hasNext) {
      out.collect(second.next)
    } else {
      printf("Not in second set: %s\n", first.next)
      println("These two lines doesn't appear when " +
        "running cogroup on solution set")
    }
  }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.disableSysoutLogging()

    val d1 = env.fromElements(
      new Tuple2(1, 1),
      new Tuple2(2, 1) ,
      new Tuple2(3, 1)
    )

    d1.iterateDelta(d1, 1, Array{0}) {
      (solutionSet, workSet) => {
        val f = workSet.filter(_._1 != 1)
        println("Cogroup on solution set with delta iteration")
        val newSolutionSet = solutionSet.coGroup(f)
          .where(0)
          .equalTo(0)
          .apply(coGroupFuntion _)
        (newSolutionSet, newSolutionSet)
      }
    }.print()

    println("Normal cogroup")
    val d2 = d1.filter(_._1 != 1)
    d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
  }
}



Is this the expected behavior or should I file a bug about this ?

Best regards,
Kien Truong


Reply | Threaded
Open this post in threaded view
|

Re: Different CoGroup behavior inside DeltaIteration

Kien Truong

Hi,
Thanks for the suggestion. I'm trying to use the delta iteration so that I can get the empty work set convergence criteria for free. But since doing an outer join between the work set and the solution set is not possible using cogroup, I will try to adapt my algorithm to use the bulk iteration.

Best, 
Kien Truong

Sent using CloudMagic Email
On Mon, Nov 16, 2015 at 11:02 PM, Stephan Ewen <[hidden email]> wrote:

It is actually very important that the co group in delta iterations works like that.
If the CoGroup touched every element in the solution set, the "decreasing work" effect would not happen.

The delta iterations are designed for cases where specific updates to the solution are made, driven by the workset.
Driving an operator by solution set contents would result in a "bulk iteration" style pattern, so the idea would be to use a proper bulk iteration for those cases.

Does that make sense?



On Mon, Nov 16, 2015 at 10:54 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

this is an artifact of how the solution set is internally implemented. Usually, a CoGroup is executed using a sort-merge strategy, i.e., both input are sorted, merged, and handed to the CoGroup function in a streaming fashion. Both inputs are treated equally, and if one of both inputs does not contain a key which is contained in the other input, the CoGroup function is called with an empty iterator.

The solution set of a delta iteration is stored in a hash table (with only one entry per key). When a solution set is coGrouped with another data set, the other input is sorted and probed against the hash table. The solution set iterator of the CoGroup function will contain one element if the hash table contains an element and be empty if the hash table doesn't contain an entry for the key. However, the hash table will not check that all elements of the hash table have been looked-up in order to identify elements of the solution set for which no corresponding element was present in the other data set.

So, the CoGroup with a solution set works only in one direction as stated in the documentation. This is kind of intended by the way the solution set CoGroup is implemented, but we should definitely updated the documentation to cover this case!

If you have a use case that requires a solution set CoGroup with the missing behavior you should open a JIRA issue.
Otherwise it would be great if you could also open a JIRA issue to extend the documentation.

Thank you, Fabian

2015-11-16 1:02 GMT+01:00 Truong Duc Kien <[hidden email]>:
Hi,

When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:

If a DataSet has a group with no matching key in the other DataSet,
the CoGroupFunction is called with an empty group for the non-existing
group.

The following code shows the problem.

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object CoGroupExample {

def coGroupFuntion(first: Iterator[(Int, Int)],
second: Iterator[(Int, Int)],
out: Collector[(Int, Int)]): Unit = {
if (second.hasNext) {
out.collect(second.next)
} else {
printf("Not in second set: %s\n", first.next)
println("These two lines doesn't appear when " +
"running cogroup on solution set")
}
}

def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging()

val d1 = env.fromElements(
new Tuple2(1, 1),
new Tuple2(2, 1) ,
new Tuple2(3, 1)
)

d1.iterateDelta(d1, 1, Array{0}) {
(solutionSet, workSet) => {
val f = workSet.filter(_._1 != 1)
println("Cogroup on solution set with delta iteration")
val newSolutionSet = solutionSet.coGroup(f)
.where(0)
.equalTo(0)
.apply(coGroupFuntion _)
(newSolutionSet, newSolutionSet)
}
}.print()

println("Normal cogroup")
val d2 = d1.filter(_._1 != 1)
d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
}
}



Is this the expected behavior or should I file a bug about this ?

Best regards,
Kien Truong