Hi,
it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result while i was trying debugging i found this i have this code val aggregationResult = //something that creates the dataset and uses join, group, reduce and map logger.error("res count " + aggregationResult.count) aggregationResult.print the logger prints a dataset size of 7 the output result is made of 6 elements this happens randomly sometimes the result is larger than the count and sometimes they are both correct at 10 flink version 0.9milestone1 any idea of what can make it “not deterministic”? thanks for help |
Is it possible that is due to the hasher?
Inside my code i was using the google guava hasher (sha256 as a Long hash) sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got different hash for the same id, especially when running on an not-local execution environment I removed it anywhere and I started using the java hashcode, now it is seems to work > Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni <[hidden email]> ha scritto: > > Hi, > it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result > > while i was trying debugging i found this > > i have this code > > val aggregationResult = //something that creates the dataset and uses join, group, reduce and map > logger.error("res count " + aggregationResult.count) > aggregationResult.print > > > > the logger prints a dataset size of 7 > the output result is made of 6 elements > > this happens randomly sometimes the result is larger than the count and sometimes they are both correct at 10 > > > > flink version 0.9milestone1 > > any idea of what can make it “not deterministic”? > thanks for help |
Invalid hash values can certainly cause non-deterministic results. Can you provide a code snippet that shows how and where you used the Guava Hasher?2015-05-16 11:52 GMT+02:00 Michele Bertoni <[hidden email]>: Is it possible that is due to the hasher? |
The first time I hash my data is in the reading phase: each line is added of one field that is the hash of its file name, I do this with a custom reader that extends the DelimitedInputFormat and override the open, nextRecord and readRecord methods
/* … */
private var id : Long = 0L
override def open(split : FileInputSplit) = {
super.open (split)
//TODO hasher problem: guava fails, java hashcode works
//val hf : HashFunction = Hashing.sha256()
//id = hf.newHasher.putString(split.getPath.getName.toString, Charsets.UTF_8).hash.asLong
id = (split.getPath.getName.toString).hashCode.toLong
}
override def readRecord(reusable : (FlinkRegionType), bytes : Array[Byte], offset : Int, numBytes : Int) : (FlinkRegionType) = {
(parser(id, new String(bytes.slice(offset,offset+numBytes), Charset.forName(charsetName))))
}
override def nextRecord(record : FlinkRegionType) : FlinkRegionType = {
try{
super.nextRecord(record)
} catch {
case e : ParsingException => {
logger.info(“Region Data format error in the tuple: " + e.getMessage)
nextRecord(record)
}
}
}
/* … */
Then every time I join two dataset or want to aggregate (groupBy) by many different field of the tuple I create a new hash of the concatenation of the respective id
val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)] =
ref
.joinWithHuge(exp).where(0,2).equalTo(0,2){
(r : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), x : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), out : Collector[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int,
Long)]) => {
if(/* regions cross */) {
//TODO hasher problem: guava fails, java hashcode works
//val hashId = hf.newHasher().putString(r._4.toString + x._4.toString, Charsets.UTF_8).hash.asLong
val hashId = (r._4.toString + x._4.toString).hashCode.toLong
//TODO hasher problem: guava fails, java hashcode works
//val aggregationId = hf.newHasher().putString(hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString, Charsets.UTF_8).hash.asLong
val aggregationId = (hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString).hashCode.toLong
out.collect(hashId, r._5, r._6, r._7, r._8, r._9, List(x._9), 1, aggregationId)
}
}
}
This is just an example, I have two kind of data the one I showed is the core data, then I have the meta data associated to the core via the same hash of the original file name
Also on the meta I have similar functionality of joining grouping and re-hashing
Again with the java hashcode (see above) anything seems to work
|
I forgot i was importing guava in this way
import com.google.common.hash.{HashFunction, Hashing}
including it in maven
but i had also the opportunity to use
import org.apache.flink.shaded.com.google.common.hash.{HashFunction, Hashing}
none of them is working properly
|
Hi Michele! I cannot tell you what the problem is at a first glance, but here are some pointers that may help you find the problem: Input split creation determinism - The number of input splits is not really deterministic. It depends on the parallelism of the source (this tells the system how many splits it should create at least). - The splits themselves may also not be strictly deterministic, they can be influenced by the order in which files in directories get enumerated. ==> The records that a specific split contains may vary between two runs. I am not sure if that is an issue in your implementation is the hash or the file name, which should not change. Input split assignment determinism: - The input splits are assigned dynamically at runtime to the data sources. Through that dynamic assignment, Flink balances between slower and faster sources and tries to get the best locality. ==> Not every split is processed by the same subtask of the data source in two successive runs. Maybe these pointers help you rethink some of your assumptions. I would suspect you make some assumption that is not guaranteed by the system. It would be good if you shared a bit more of your code, such as the flow of the program (what functions follow which functions). If you do not want to share the code publicly on this mailing list, you can send me a private mail at [hidden email] Greetings, Stephan On Sat, May 16, 2015 at 12:22 PM, Michele Bertoni <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |