inconsistency in count and print

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

inconsistency in count and print

Michele Bertoni
Hi,
it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result

while i was trying debugging i found this

i have this code

val aggregationResult  = //something that creates the dataset and uses join, group, reduce and map
logger.error("res count " + aggregationResult.count)
aggregationResult.print



the logger prints a dataset size of 7
the output result is made of 6 elements

this happens randomly sometimes the result is larger than the count and sometimes they are both correct at 10



flink version 0.9milestone1

any idea of what can make it “not deterministic”?
thanks for help
Reply | Threaded
Open this post in threaded view
|

Re: inconsistency in count and print

Michele Bertoni
Is it possible that is due to the hasher?

Inside my code i was using the google guava hasher (sha256 as a Long hash)
sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got different hash for the same id, especially when running on an not-local execution environment

I removed it anywhere and I started using the java hashcode, now it is seems to work


> Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni <[hidden email]> ha scritto:
>
> Hi,
> it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result
>
> while i was trying debugging i found this
>
> i have this code
>
> val aggregationResult  = //something that creates the dataset and uses join, group, reduce and map
> logger.error("res count " + aggregationResult.count)
> aggregationResult.print
>
>
>
> the logger prints a dataset size of 7
> the output result is made of 6 elements
>
> this happens randomly sometimes the result is larger than the count and sometimes they are both correct at 10
>
>
>
> flink version 0.9milestone1
>
> any idea of what can make it “not deterministic”?
> thanks for help

Reply | Threaded
Open this post in threaded view
|

Re: inconsistency in count and print

Fabian Hueske-2
Invalid hash values can certainly cause non-deterministic results.

Can you provide a code snippet that shows how and where you used the Guava Hasher?

2015-05-16 11:52 GMT+02:00 Michele Bertoni <[hidden email]>:
Is it possible that is due to the hasher?

Inside my code i was using the google guava hasher (sha256 as a Long hash)
sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got different hash for the same id, especially when running on an not-local execution environment

I removed it anywhere and I started using the java hashcode, now it is seems to work


> Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni <[hidden email]> ha scritto:
>
> Hi,
> it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result
>
> while i was trying debugging i found this
>
> i have this code
>
> val aggregationResult  = //something that creates the dataset and uses join, group, reduce and map
> logger.error("res count " + aggregationResult.count)
> aggregationResult.print
>
>
>
> the logger prints a dataset size of 7
> the output result is made of 6 elements
>
> this happens randomly sometimes the result is larger than the count and sometimes they are both correct at 10
>
>
>
> flink version 0.9milestone1
>
> any idea of what can make it “not deterministic”?
> thanks for help


Reply | Threaded
Open this post in threaded view
|

Re: inconsistency in count and print

Michele Bertoni
The first time I hash my data is in the reading phase: each line is added of one field that is the hash of its file name, I do this with a custom reader that extends the DelimitedInputFormat and override the open, nextRecord and readRecord methods

/* … */
private var id : Long = 0L

override def open(split : FileInputSplit) = {
    super.open (split)
    //TODO hasher problem: guava fails, java hashcode works
    //val hf : HashFunction = Hashing.sha256()
    //id = hf.newHasher.putString(split.getPath.getName.toString, Charsets.UTF_8).hash.asLong
    id = (split.getPath.getName.toString).hashCode.toLong
  }

override def readRecord(reusable : (FlinkRegionType), bytes : Array[Byte], offset : Int, numBytes : Int) : (FlinkRegionType) = {
    (parser(id, new String(bytes.slice(offset,offset+numBytes), Charset.forName(charsetName))))
  }

override def nextRecord(record : FlinkRegionType) : FlinkRegionType = {
    try{
      super.nextRecord(record)
    } catch {
       case e : ParsingException => {
         logger.info(“Region Data format error in the tuple: " + e.getMessage)
         nextRecord(record)
       }
    }
  }

/* … */



Then every time I join two dataset or want to aggregate (groupBy) by many different field of the tuple I create a new hash of the concatenation of the respective id

val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)] =
      ref
        .joinWithHuge(exp).where(0,2).equalTo(0,2){
          (r : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), x : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), out : Collector[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)]) => {
            if(/* regions cross */) {

              //TODO hasher problem: guava fails, java hashcode works
              //val hashId = hf.newHasher().putString(r._4.toString + x._4.toString, Charsets.UTF_8).hash.asLong
              val hashId = (r._4.toString + x._4.toString).hashCode.toLong

              //TODO hasher problem: guava fails, java hashcode works
              //val aggregationId = hf.newHasher().putString(hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString, Charsets.UTF_8).hash.asLong
              val aggregationId = (hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString).hashCode.toLong
              out.collect(hashId, r._5, r._6, r._7, r._8, r._9, List(x._9), 1, aggregationId)
            }
        }
    }


This is just an example, I have two kind of data the one I showed is the core data, then I have the meta data associated to the core via the same hash of the original file name
Also on the meta I have similar functionality of joining grouping and re-hashing
Again with the java hashcode (see above) anything seems to work




Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske <[hidden email]> ha scritto:

Invalid hash values can certainly cause non-deterministic results.

Can you provide a code snippet that shows how and where you used the Guava Hasher?

2015-05-16 11:52 GMT+02:00 Michele Bertoni <[hidden email]>:
Is it possible that is due to the hasher?

Inside my code i was using the google guava hasher (sha256 as a Long hash)
sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got different hash for the same id, especially when running on an not-local execution environment

I removed it anywhere and I started using the java hashcode, now it is seems to work


> Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni <[hidden email]> ha scritto:
>
> Hi,
> it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result
>
> while i was trying debugging i found this
>
> i have this code
>
> val aggregationResult  = //something that creates the dataset and uses join, group, reduce and map
> logger.error("res count " + aggregationResult.count)
> aggregationResult.print
>
>
>
> the logger prints a dataset size of 7
> the output result is made of 6 elements
>
> this happens randomly sometimes the result is larger than the count and sometimes they are both correct at 10
>
>
>
> flink version 0.9milestone1
>
> any idea of what can make it “not deterministic”?
> thanks for help



Reply | Threaded
Open this post in threaded view
|

Re: inconsistency in count and print

Michele Bertoni
I forgot i was importing guava in this way

import com.google.common.hash.{HashFunction, Hashing}
including it in maven

but i had also the opportunity to use
import org.apache.flink.shaded.com.google.common.hash.{HashFunction, Hashing}

none of them is working properly


Il giorno 16/mag/2015, alle ore 12:20, Michele Bertoni <[hidden email]> ha scritto:

The first time I hash my data is in the reading phase: each line is added of one field that is the hash of its file name, I do this with a custom reader that extends the DelimitedInputFormat and override the open, nextRecord and readRecord methods

/* … */
private var id : Long = 0L

override def open(split : FileInputSplit) = {
    super.open (split)
    //TODO hasher problem: guava fails, java hashcode works
    //val hf : HashFunction = Hashing.sha256()
    //id = hf.newHasher.putString(split.getPath.getName.toString, Charsets.UTF_8).hash.asLong
    id = (split.getPath.getName.toString).hashCode.toLong
  }

override def readRecord(reusable : (FlinkRegionType), bytes : Array[Byte], offset : Int, numBytes : Int) : (FlinkRegionType) = {
    (parser(id, new String(bytes.slice(offset,offset+numBytes), Charset.forName(charsetName))))
  }

override def nextRecord(record : FlinkRegionType) : FlinkRegionType = {
    try{
      super.nextRecord(record)
    } catch {
       case e : ParsingException => {
         logger.info(“Region Data format error in the tuple: " + e.getMessage)
         nextRecord(record)
       }
    }
  }

/* … */



Then every time I join two dataset or want to aggregate (groupBy) by many different field of the tuple I create a new hash of the concatenation of the respective id

val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)] =
      ref
        .joinWithHuge(exp).where(0,2).equalTo(0,2){
          (r : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), x : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), out : Collector[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)]) => {
            if(/* regions cross */) {

              //TODO hasher problem: guava fails, java hashcode works
              //val hashId = hf.newHasher().putString(r._4.toString + x._4.toString, Charsets.UTF_8).hash.asLong
              val hashId = (r._4.toString + x._4.toString).hashCode.toLong

              //TODO hasher problem: guava fails, java hashcode works
              //val aggregationId = hf.newHasher().putString(hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString, Charsets.UTF_8).hash.asLong
              val aggregationId = (hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString).hashCode.toLong
              out.collect(hashId, r._5, r._6, r._7, r._8, r._9, List(x._9), 1, aggregationId)
            }
        }
    }


This is just an example, I have two kind of data the one I showed is the core data, then I have the meta data associated to the core via the same hash of the original file name
Also on the meta I have similar functionality of joining grouping and re-hashing
Again with the java hashcode (see above) anything seems to work




Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske <[hidden email]> ha scritto:

Invalid hash values can certainly cause non-deterministic results.

Can you provide a code snippet that shows how and where you used the Guava Hasher?

2015-05-16 11:52 GMT+02:00 Michele Bertoni <[hidden email]>:
Is it possible that is due to the hasher?

Inside my code i was using the google guava hasher (sha256 as a Long hash)
sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got different hash for the same id, especially when running on an not-local execution environment

I removed it anywhere and I started using the java hashcode, now it is seems to work


> Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni <[hidden email]> ha scritto:
>
> Hi,
> it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result
>
> while i was trying debugging i found this
>
> i have this code
>
> val aggregationResult  = //something that creates the dataset and uses join, group, reduce and map
> logger.error("res count " + aggregationResult.count)
> aggregationResult.print
>
>
>
> the logger prints a dataset size of 7
> the output result is made of 6 elements
>
> this happens randomly sometimes the result is larger than the count and sometimes they are both correct at 10
>
>
>
> flink version 0.9milestone1
>
> any idea of what can make it “not deterministic”?
> thanks for help




Reply | Threaded
Open this post in threaded view
|

Re: inconsistency in count and print

Stephan Ewen
Hi Michele!

I cannot tell you what the problem is at a first glance, but here are some pointers that may help you find the problem:

Input split creation determinism
  - The number of input splits is not really deterministic. It depends on the parallelism of the source (this tells the system how many splits it should create at least). 
  - The splits themselves may also not be strictly deterministic, they can be influenced by the order in which files in directories get enumerated.

==> The records that a specific split contains may vary between two runs. I am not sure if that is an issue in your implementation is the hash or the file name, which should not change.


Input split assignment determinism:
  - The input splits are assigned dynamically at runtime to the data sources. Through that dynamic assignment, Flink balances between slower and faster sources and tries to get the best locality.

==> Not every split is processed by the same subtask of the data source in two successive runs.


Maybe these pointers help you rethink some of your assumptions. I would suspect you make some assumption that is not guaranteed by the system.
It would be good if you shared a bit more of your code, such as the flow of the program (what functions follow which functions).

If you do not want to share the code publicly on this mailing list, you can send me a private mail at [hidden email]


Greetings,
Stephan



On Sat, May 16, 2015 at 12:22 PM, Michele Bertoni <[hidden email]> wrote:
I forgot i was importing guava in this way

import com.google.common.hash.{HashFunction, Hashing}
including it in maven

but i had also the opportunity to use
import org.apache.flink.shaded.com.google.common.hash.{HashFunction, Hashing}

none of them is working properly


Il giorno 16/mag/2015, alle ore 12:20, Michele Bertoni <[hidden email]> ha scritto:

The first time I hash my data is in the reading phase: each line is added of one field that is the hash of its file name, I do this with a custom reader that extends the DelimitedInputFormat and override the open, nextRecord and readRecord methods

/* … */
private var id : Long = 0L

override def open(split : FileInputSplit) = {
    super.open (split)
    //TODO hasher problem: guava fails, java hashcode works
    //val hf : HashFunction = Hashing.sha256()
    //id = hf.newHasher.putString(split.getPath.getName.toString, Charsets.UTF_8).hash.asLong
    id = (split.getPath.getName.toString).hashCode.toLong
  }

override def readRecord(reusable : (FlinkRegionType), bytes : Array[Byte], offset : Int, numBytes : Int) : (FlinkRegionType) = {
    (parser(id, new String(bytes.slice(offset,offset+numBytes), Charset.forName(charsetName))))
  }

override def nextRecord(record : FlinkRegionType) : FlinkRegionType = {
    try{
      super.nextRecord(record)
    } catch {
       case e : ParsingException => {
         logger.info(“Region Data format error in the tuple: " + e.getMessage)
         nextRecord(record)
       }
    }
  }

/* … */



Then every time I join two dataset or want to aggregate (groupBy) by many different field of the tuple I create a new hash of the concatenation of the respective id

val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)] =
      ref
        .joinWithHuge(exp).where(0,2).equalTo(0,2){
          (r : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), x : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), out : Collector[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)]) => {
            if(/* regions cross */) {

              //TODO hasher problem: guava fails, java hashcode works
              //val hashId = hf.newHasher().putString(r._4.toString + x._4.toString, Charsets.UTF_8).hash.asLong
              val hashId = (r._4.toString + x._4.toString).hashCode.toLong

              //TODO hasher problem: guava fails, java hashcode works
              //val aggregationId = hf.newHasher().putString(hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString, Charsets.UTF_8).hash.asLong
              val aggregationId = (hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString).hashCode.toLong
              out.collect(hashId, r._5, r._6, r._7, r._8, r._9, List(x._9), 1, aggregationId)
            }
        }
    }


This is just an example, I have two kind of data the one I showed is the core data, then I have the meta data associated to the core via the same hash of the original file name
Also on the meta I have similar functionality of joining grouping and re-hashing
Again with the java hashcode (see above) anything seems to work




Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske <[hidden email]> ha scritto:

Invalid hash values can certainly cause non-deterministic results.

Can you provide a code snippet that shows how and where you used the Guava Hasher?

2015-05-16 11:52 GMT+02:00 Michele Bertoni <[hidden email]>:
Is it possible that is due to the hasher?

Inside my code i was using the google guava hasher (sha256 as a Long hash)
sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got different hash for the same id, especially when running on an not-local execution environment

I removed it anywhere and I started using the java hashcode, now it is seems to work


> Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni <[hidden email]> ha scritto:
>
> Hi,
> it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result
>
> while i was trying debugging i found this
>
> i have this code
>
> val aggregationResult  = //something that creates the dataset and uses join, group, reduce and map
> logger.error("res count " + aggregationResult.count)
> aggregationResult.print
>
>
>
> the logger prints a dataset size of 7
> the output result is made of 6 elements
>
> this happens randomly sometimes the result is larger than the count and sometimes they are both correct at 10
>
>
>
> flink version 0.9milestone1
>
> any idea of what can make it “not deterministic”?
> thanks for help