reduce error

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

reduce error

Michele Bertoni
Hi everybody, I am facing a very strange problem
Sometimes when I run my program one line of the result is totally wrong and if I repeat the execution with same input it can change


The algorithm takes two dataset and execute something like a left outer join where
if there is a match
 - it increments a counter on the left tuple
 - add some values of the right tuple to an array of the left tuple
in the end it outputs the left value with the final counter and list of values (printer lines A and B)

then there is a group reduce phase that merge element from different groups (there can be but not always)

finally there is a map that applies some custom functions (printer lines C and D)




my problem is that from printer B and C the data actually change with no reason

in example in line B counter is 7 in line C it is 1.82E9, it always goes to very high numbers from E9 to E18 while it is expected to be smaller than 20
moreover the array of extradata goes to null


what i found is that
it never happened if the reduce is actually executed (when there are many elements in a group) but only when there is one element per group (so the reduce should not change data in a tuple)
I am running on a small dataset 60k lines and if it happens, it always happens in the same line of the input
if I change dataset it happens on an other line but always on the same for that dataset

By now it always happened in only one line for each dataset (I am using a local environment with parallelism 1)




I tried to changed the reduce with a reducegroup and inside the reducegroup a pure scala reduce, this worked perfectly (by now)
I also added two groupby and reducegroup block (that take the line at put it in output) before and after the groupby-reduce, in both cases i faced no problem at all
it only seems to appear when using the group and reduce

any idea on what could be the problem? I am getting mad on it


thanks for help



this is the code I use (there are a lot of println for debugging)

groupedRef // expID, bin, chromosome
.coGroup(groupedExp).where(1,3,5).equalTo(2,1,3){
(references : Iterator[(Long, Long, Int, Int, Long, String, Long, Long, Char, Array[GValue], Long)], experiments : Iterator[(Int, Int, Long, String, Long, Long, Char, Array[GValue])], out : Collector[(Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long)]) => {
val refCollected : List[PartialResult] = references.map((r) => (new PartialResult(r, 0, extraData ))).toList
for(e <- experiments){
for(r <- refCollected){
if( /* complex clause not important */ ) {
r.count += 1
r.extra = r.extra :+ e._8.foldLeft(new Array[List[GValue]](0))((z : Array[List[GValue]], v : GValue) => z :+ List(v))
}
}
}


refCollected.map((pr) => {

if(pr.binnedRegion._1.equals(7773719163500064339L) && pr.binnedRegion._7.equals(4860L) && pr.binnedRegion._8.equals(5324L)){
println("-A--------------------------------------- " + pr.toString())
}
val res = (pr.binnedRegion._1, pr.binnedRegion._6, pr.binnedRegion._7, pr.binnedRegion._8, pr.binnedRegion._9, pr.binnedRegion._10, pr.extra.reduce((a,b) => a.zip(b).map((p) => p._1 ++ p._2)), pr.count, pr.binnedRegion._11)

if(res._1.equals(7773719163500064339L) && res._3.equals(4860L) && res._4.equals(5324L)){
println("-B--------------------------------------- " + (res._1, res._2, res._3, res._4, res._5, res._6.mkString((" - ")), res._7.mkString(" - "), res._8, res._9 ))
}

out.collect(res)
})

  }
}
val aggregationResult : DataSet[(Long, String, Long, Long, Char, Array[GValue])] =
coGroupResult
//-----------------------------------------------------------------
.groupBy(8)
//reduce phase
//concatenation of extra data
.reduce(
(r1,r2) => {
val out = (r1._1, r1._2, r1._3, r1._4, r1._5, r1._6,
r1._7
.zip(r2._7)
.map((a) => a._1 ++ a._2),
r1._8 + r2._8, r1._9)

out

}
)

//————————————————————————————————
.map((l : (Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long)) => {


if(l._1.equals(7773719163500064339L) && l._3.equals(4860L) && l._4.equals(5324L)){
println("-C--------------------------------------- " + (l._1, l._2, l._3, l._4, l._5, l._6.mkString((" - ")), l._7.mkString(" - "), l._8, l._9 ))
}

val out =
//something else that is working good

if(out._1.equals(7773719163500064339L) && out._3.equals(4860L) && out._4.equals(5324L)){
println("-D--------------------------------------- " + (out._1, out._2, out._3, out._4, out._5, out._6.mkString(" - ")))
}

out
})


Reply | Threaded
Open this post in threaded view
|

Re: reduce error

Ufuk Celebi
Hey Michele! Sorry for the long delay on this. Can you share the code and data? The easiest thing to reproduce is to actually run the code with the data. It will also help a lot when trying to fix the problem. If you can't share publicly feel free to mail me at uce at apache org.

– Ufuk

On Thu, Oct 8, 2015 at 9:53 AM, Michele Bertoni <[hidden email]> wrote:
Hi everybody, I am facing a very strange problem
Sometimes when I run my program one line of the result is totally wrong and if I repeat the execution with same input it can change


The algorithm takes two dataset and execute something like a left outer join where
if there is a match
 - it increments a counter on the left tuple
 - add some values of the right tuple to an array of the left tuple
in the end it outputs the left value with the final counter and list of values (printer lines A and B)

then there is a group reduce phase that merge element from different groups (there can be but not always)

finally there is a map that applies some custom functions (printer lines C and D)




my problem is that from printer B and C the data actually change with no reason

in example in line B counter is 7 in line C it is 1.82E9, it always goes to very high numbers from E9 to E18 while it is expected to be smaller than 20
moreover the array of extradata goes to null


what i found is that
it never happened if the reduce is actually executed (when there are many elements in a group) but only when there is one element per group (so the reduce should not change data in a tuple)
I am running on a small dataset 60k lines and if it happens, it always happens in the same line of the input
if I change dataset it happens on an other line but always on the same for that dataset

By now it always happened in only one line for each dataset (I am using a local environment with parallelism 1)




I tried to changed the reduce with a reducegroup and inside the reducegroup a pure scala reduce, this worked perfectly (by now)
I also added two groupby and reducegroup block (that take the line at put it in output) before and after the groupby-reduce, in both cases i faced no problem at all
it only seems to appear when using the group and reduce

any idea on what could be the problem? I am getting mad on it


thanks for help



this is the code I use (there are a lot of println for debugging)

groupedRef // expID, bin, chromosome
.coGroup(groupedExp).where(1,3,5).equalTo(2,1,3){
(references : Iterator[(Long, Long, Int, Int, Long, String, Long, Long, Char, Array[GValue], Long)], experiments : Iterator[(Int, Int, Long, String, Long, Long, Char, Array[GValue])], out : Collector[(Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long)]) => {
val refCollected : List[PartialResult] = references.map((r) => (new PartialResult(r, 0, extraData ))).toList
for(e <- experiments){
for(r <- refCollected){
if( /* complex clause not important */ ) {
r.count += 1
r.extra = r.extra :+ e._8.foldLeft(new Array[List[GValue]](0))((z : Array[List[GValue]], v : GValue) => z :+ List(v))
}
}
}


refCollected.map((pr) => {

if(pr.binnedRegion._1.equals(7773719163500064339L) && pr.binnedRegion._7.equals(4860L) && pr.binnedRegion._8.equals(5324L)){
println("-A--------------------------------------- " + pr.toString())
}
val res = (pr.binnedRegion._1, pr.binnedRegion._6, pr.binnedRegion._7, pr.binnedRegion._8, pr.binnedRegion._9, pr.binnedRegion._10, pr.extra.reduce((a,b) => a.zip(b).map((p) => p._1 ++ p._2)), pr.count, pr.binnedRegion._11)

if(res._1.equals(7773719163500064339L) && res._3.equals(4860L) && res._4.equals(5324L)){
println("-B--------------------------------------- " + (res._1, res._2, res._3, res._4, res._5, res._6.mkString((" - ")), res._7.mkString(" - "), res._8, res._9 ))
}

out.collect(res)
})

  }
}
val aggregationResult : DataSet[(Long, String, Long, Long, Char, Array[GValue])] =
coGroupResult
//-----------------------------------------------------------------
.groupBy(8)
//reduce phase
//concatenation of extra data
.reduce(
(r1,r2) => {
val out = (r1._1, r1._2, r1._3, r1._4, r1._5, r1._6,
r1._7
.zip(r2._7)
.map((a) => a._1 ++ a._2),
r1._8 + r2._8, r1._9)

out

}
)

//————————————————————————————————
.map((l : (Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long)) => {


if(l._1.equals(7773719163500064339L) && l._3.equals(4860L) && l._4.equals(5324L)){
println("-C--------------------------------------- " + (l._1, l._2, l._3, l._4, l._5, l._6.mkString((" - ")), l._7.mkString(" - "), l._8, l._9 ))
}

val out =
//something else that is working good

if(out._1.equals(7773719163500064339L) && out._3.equals(4860L) && out._4.equals(5324L)){
println("-D--------------------------------------- " + (out._1, out._2, out._3, out._4, out._5, out._6.mkString(" - ")))
}

out
})



Reply | Threaded
Open this post in threaded view
|

Re: reduce error

Till Rohrmann
Hi Michele, I will look into the problem. As Ufuk said, it would be really helpful, if you could provide us with the data set. If it's problematic to share the data via the mailing list, then you could also send me the data privately.

Thanks a lot for your help.

Cheers,
Till

On Fri, Oct 16, 2015 at 9:59 PM, Ufuk Celebi <[hidden email]> wrote:
Hey Michele! Sorry for the long delay on this. Can you share the code and data? The easiest thing to reproduce is to actually run the code with the data. It will also help a lot when trying to fix the problem. If you can't share publicly feel free to mail me at uce at apache org.

– Ufuk

On Thu, Oct 8, 2015 at 9:53 AM, Michele Bertoni <[hidden email]> wrote:
Hi everybody, I am facing a very strange problem
Sometimes when I run my program one line of the result is totally wrong and if I repeat the execution with same input it can change


The algorithm takes two dataset and execute something like a left outer join where
if there is a match
 - it increments a counter on the left tuple
 - add some values of the right tuple to an array of the left tuple
in the end it outputs the left value with the final counter and list of values (printer lines A and B)

then there is a group reduce phase that merge element from different groups (there can be but not always)

finally there is a map that applies some custom functions (printer lines C and D)




my problem is that from printer B and C the data actually change with no reason

in example in line B counter is 7 in line C it is 1.82E9, it always goes to very high numbers from E9 to E18 while it is expected to be smaller than 20
moreover the array of extradata goes to null


what i found is that
it never happened if the reduce is actually executed (when there are many elements in a group) but only when there is one element per group (so the reduce should not change data in a tuple)
I am running on a small dataset 60k lines and if it happens, it always happens in the same line of the input
if I change dataset it happens on an other line but always on the same for that dataset

By now it always happened in only one line for each dataset (I am using a local environment with parallelism 1)




I tried to changed the reduce with a reducegroup and inside the reducegroup a pure scala reduce, this worked perfectly (by now)
I also added two groupby and reducegroup block (that take the line at put it in output) before and after the groupby-reduce, in both cases i faced no problem at all
it only seems to appear when using the group and reduce

any idea on what could be the problem? I am getting mad on it


thanks for help



this is the code I use (there are a lot of println for debugging)

groupedRef // expID, bin, chromosome
.coGroup(groupedExp).where(1,3,5).equalTo(2,1,3){
(references : Iterator[(Long, Long, Int, Int, Long, String, Long, Long, Char, Array[GValue], Long)], experiments : Iterator[(Int, Int, Long, String, Long, Long, Char, Array[GValue])], out : Collector[(Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long)]) => {
val refCollected : List[PartialResult] = references.map((r) => (new PartialResult(r, 0, extraData ))).toList
for(e <- experiments){
for(r <- refCollected){
if( /* complex clause not important */ ) {
r.count += 1
r.extra = r.extra :+ e._8.foldLeft(new Array[List[GValue]](0))((z : Array[List[GValue]], v : GValue) => z :+ List(v))
}
}
}


refCollected.map((pr) => {

if(pr.binnedRegion._1.equals(7773719163500064339L) && pr.binnedRegion._7.equals(4860L) && pr.binnedRegion._8.equals(5324L)){
println("-A--------------------------------------- " + pr.toString())
}
val res = (pr.binnedRegion._1, pr.binnedRegion._6, pr.binnedRegion._7, pr.binnedRegion._8, pr.binnedRegion._9, pr.binnedRegion._10, pr.extra.reduce((a,b) => a.zip(b).map((p) => p._1 ++ p._2)), pr.count, pr.binnedRegion._11)

if(res._1.equals(7773719163500064339L) && res._3.equals(4860L) && res._4.equals(5324L)){
println("-B--------------------------------------- " + (res._1, res._2, res._3, res._4, res._5, res._6.mkString((" - ")), res._7.mkString(" - "), res._8, res._9 ))
}

out.collect(res)
})

  }
}
val aggregationResult : DataSet[(Long, String, Long, Long, Char, Array[GValue])] =
coGroupResult
//-----------------------------------------------------------------
.groupBy(8)
//reduce phase
//concatenation of extra data
.reduce(
(r1,r2) => {
val out = (r1._1, r1._2, r1._3, r1._4, r1._5, r1._6,
r1._7
.zip(r2._7)
.map((a) => a._1 ++ a._2),
r1._8 + r2._8, r1._9)

out

}
)

//————————————————————————————————
.map((l : (Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long)) => {


if(l._1.equals(7773719163500064339L) && l._3.equals(4860L) && l._4.equals(5324L)){
println("-C--------------------------------------- " + (l._1, l._2, l._3, l._4, l._5, l._6.mkString((" - ")), l._7.mkString(" - "), l._8, l._9 ))
}

val out =
//something else that is working good

if(out._1.equals(7773719163500064339L) && out._3.equals(4860L) && out._4.equals(5324L)){
println("-D--------------------------------------- " + (out._1, out._2, out._3, out._4, out._5, out._6.mkString(" - ")))
}

out
})