Issue with parallelism when CoGrouping custom nested data tpye

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue with parallelism when CoGrouping custom nested data tpye

Pieter Hameete
Dear fellow Flinkers,

I am implementing queries from the XMark (http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.

For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the personid of the person and the personid of the buyer of an auction, so that I can count the number of purchases of a person. I select this personid as key from the custom nested data type in the where and equalTo functions of the coGroup. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype DawnData. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data.
def env = ExecutionEnvironment.getExecutionEnvironment
def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") })
 .equalTo( auction => { auction.select("2/buyer/@person/2") })
 .apply( (personsres, auctionsres, out : Collector[DawnData]) => {
   // my cogroup function here that outputs the name of the person and the number of auctions
  }
}).setParallelism(1)
This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database!

If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-)

Kind regards.

Pieter Hameete


Reply | Threaded
Open this post in threaded view
|

Re: Issue with parallelism when CoGrouping custom nested data tpye

Till Rohrmann-2
Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible for you to post a complete example with data (also possible to include it in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <[hidden email]> wrote:
Dear fellow Flinkers,

I am implementing queries from the XMark (http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.

For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the personid of the person and the personid of the buyer of an auction, so that I can count the number of purchases of a person. I select this personid as key from the custom nested data type in the where and equalTo functions of the coGroup. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype DawnData. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data.
def env = ExecutionEnvironment.getExecutionEnvironment
def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") })
 .equalTo( auction => { auction.select("2/buyer/@person/2") })
 .apply( (personsres, auctionsres, out : Collector[DawnData]) => {
   // my cogroup function here that outputs the name of the person and the number of auctions
  }
}).setParallelism(1)
This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database!

If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-)

Kind regards.

Pieter Hameete



Reply | Threaded
Open this post in threaded view
|

Re: Issue with parallelism when CoGrouping custom nested data tpye

Fabian Hueske-2
This sound like a problem with your custom type and its (presumably) custom serializers and comparators.

I assume it is not an issue of partitioning or sorting because Reduce is working fine, as you reported.
CoGroup does also partition and sort data, but compares the elements of two sorted streams.

I would check the following methods:
- extractKeys
- getFlatComparators
- duplicate (duplicate must return a deep copy, esp. of all nested comparators)

Feel free to share your custom TypeInfo, Comparator, and Serializers.

Cheers, Fabian

2015-09-16 10:52 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible for you to post a complete example with data (also possible to include it in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <[hidden email]> wrote:
Dear fellow Flinkers,

I am implementing queries from the XMark (http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.

For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the personid of the person and the personid of the buyer of an auction, so that I can count the number of purchases of a person. I select this personid as key from the custom nested data type in the where and equalTo functions of the coGroup. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype DawnData. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data.
def env = ExecutionEnvironment.getExecutionEnvironment
def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") })
 .equalTo( auction => { auction.select("2/buyer/@person/2") })
 .apply( (personsres, auctionsres, out : Collector[DawnData]) => {
   // my cogroup function here that outputs the name of the person and the number of auctions
  }
}).setParallelism(1)
This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database!

If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-)

Kind regards.

Pieter Hameete




Reply | Threaded
Open this post in threaded view
|

Re: Issue with parallelism when CoGrouping custom nested data tpye

Pieter Hameete
Cheers Till and Fabian for your fast replies, it's much appreciated!

I figured something should be wrong with my data type. I have no doubt the CoGroup works just fine :-) Its pointers what to investigate about my datatype what I am looking for. Initially I had problems with serialization causing strange issues as well, these were resolved after I had rewritten my serialization so I believe that is working OK.

I'll try looking into the data type some more with your tips. If I cant figure it out i'll share the repository with you later today or tomorrow.

Kind regards,

Pieter





2015-09-16 11:02 GMT+02:00 Fabian Hueske <[hidden email]>:
This sound like a problem with your custom type and its (presumably) custom serializers and comparators.

I assume it is not an issue of partitioning or sorting because Reduce is working fine, as you reported.
CoGroup does also partition and sort data, but compares the elements of two sorted streams.

I would check the following methods:
- extractKeys
- getFlatComparators
- duplicate (duplicate must return a deep copy, esp. of all nested comparators)

Feel free to share your custom TypeInfo, Comparator, and Serializers.

Cheers, Fabian

2015-09-16 10:52 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible for you to post a complete example with data (also possible to include it in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <[hidden email]> wrote:
Dear fellow Flinkers,

I am implementing queries from the XMark (http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.

For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the personid of the person and the personid of the buyer of an auction, so that I can count the number of purchases of a person. I select this personid as key from the custom nested data type in the where and equalTo functions of the coGroup. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype DawnData. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data.
def env = ExecutionEnvironment.getExecutionEnvironment
def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") })
 .equalTo( auction => { auction.select("2/buyer/@person/2") })
 .apply( (personsres, auctionsres, out : Collector[DawnData]) => {
   // my cogroup function here that outputs the name of the person and the number of auctions
  }
}).setParallelism(1)
This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database!

If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-)

Kind regards.

Pieter Hameete





Reply | Threaded
Open this post in threaded view
|

Re: Issue with parallelism when CoGrouping custom nested data tpye

Pieter Hameete
Hi,

I havent been able to find the problem yet, and I dont know exactly how to check the methods you suggested to check earlier (extractKeys, getFlatComparators, duplicate) for the Scala API. Do you have some pointers for me on how I can check these myself?

In my earlier mail I stated that maps, filters and reduces work fine. I found that this was not correct: for my previous queries I have only used maps and filters. I made an extra test and found that indeed the following code using a reduce also generates faulty results when increasing paralellism past 1:
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def test = auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( (groupedauctions, out : Collector[DawnData]) => {
out.collect(new DawnData(groupedauctions.size))
}).setParallelism(1)
test.print
Does this indicate that something else could be wrong with the custom datatype?

You can find the corresponding code and a small dataset at https://github.com/PHameete/dawn-flink in the development branch. It is a Scala Maven project so you should be able to run the main.scala.wis.dawnflink.performance.DawnBenchmarkSuite class out of the box to run the query from my first email. In this class you can also change the query thats being run or run multiple queries. If this does not work please let me know!

Kind regards and cheers again!

- Pieter




2015-09-16 11:24 GMT+02:00 Pieter Hameete <[hidden email]>:
Cheers Till and Fabian for your fast replies, it's much appreciated!

I figured something should be wrong with my data type. I have no doubt the CoGroup works just fine :-) Its pointers what to investigate about my datatype what I am looking for. Initially I had problems with serialization causing strange issues as well, these were resolved after I had rewritten my serialization so I believe that is working OK.

I'll try looking into the data type some more with your tips. If I cant figure it out i'll share the repository with you later today or tomorrow.

Kind regards,

Pieter





2015-09-16 11:02 GMT+02:00 Fabian Hueske <[hidden email]>:
This sound like a problem with your custom type and its (presumably) custom serializers and comparators.

I assume it is not an issue of partitioning or sorting because Reduce is working fine, as you reported.
CoGroup does also partition and sort data, but compares the elements of two sorted streams.

I would check the following methods:
- extractKeys
- getFlatComparators
- duplicate (duplicate must return a deep copy, esp. of all nested comparators)

Feel free to share your custom TypeInfo, Comparator, and Serializers.

Cheers, Fabian

2015-09-16 10:52 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible for you to post a complete example with data (also possible to include it in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <[hidden email]> wrote:
Dear fellow Flinkers,

I am implementing queries from the XMark (http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.

For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the personid of the person and the personid of the buyer of an auction, so that I can count the number of purchases of a person. I select this personid as key from the custom nested data type in the where and equalTo functions of the coGroup. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype DawnData. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data.
def env = ExecutionEnvironment.getExecutionEnvironment
def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") })
 .equalTo( auction => { auction.select("2/buyer/@person/2") })
 .apply( (personsres, auctionsres, out : Collector[DawnData]) => {
   // my cogroup function here that outputs the name of the person and the number of auctions
  }
}).setParallelism(1)
This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database!

If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-)

Kind regards.

Pieter Hameete






Reply | Threaded
Open this post in threaded view
|

Re: Issue with parallelism when CoGrouping custom nested data tpye

Fabian Hueske-2
Sorry, I was thinking too complicated. Forget about the methods I mentioned.

If you are implementing WritableComparable types, you need to override the hashcode() method.
Flink treats WritableComparable types just like Hadoop [1].
DawnData does not implement hashcode() which causes inconsistent hash partitioning.

Please let me know, if that solved your problem.

Cheers, Fabian

2015-09-16 14:12 GMT+02:00 Pieter Hameete <[hidden email]>:
Hi,

I havent been able to find the problem yet, and I dont know exactly how to check the methods you suggested to check earlier (extractKeys, getFlatComparators, duplicate) for the Scala API. Do you have some pointers for me on how I can check these myself?

In my earlier mail I stated that maps, filters and reduces work fine. I found that this was not correct: for my previous queries I have only used maps and filters. I made an extra test and found that indeed the following code using a reduce also generates faulty results when increasing paralellism past 1:
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def test = auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( (groupedauctions, out : Collector[DawnData]) => {
out.collect(new DawnData(groupedauctions.size))
}).setParallelism(1)
test.print
Does this indicate that something else could be wrong with the custom datatype?

You can find the corresponding code and a small dataset at https://github.com/PHameete/dawn-flink in the development branch. It is a Scala Maven project so you should be able to run the main.scala.wis.dawnflink.performance.DawnBenchmarkSuite class out of the box to run the query from my first email. In this class you can also change the query thats being run or run multiple queries. If this does not work please let me know!

Kind regards and cheers again!

- Pieter




2015-09-16 11:24 GMT+02:00 Pieter Hameete <[hidden email]>:
Cheers Till and Fabian for your fast replies, it's much appreciated!

I figured something should be wrong with my data type. I have no doubt the CoGroup works just fine :-) Its pointers what to investigate about my datatype what I am looking for. Initially I had problems with serialization causing strange issues as well, these were resolved after I had rewritten my serialization so I believe that is working OK.

I'll try looking into the data type some more with your tips. If I cant figure it out i'll share the repository with you later today or tomorrow.

Kind regards,

Pieter





2015-09-16 11:02 GMT+02:00 Fabian Hueske <[hidden email]>:
This sound like a problem with your custom type and its (presumably) custom serializers and comparators.

I assume it is not an issue of partitioning or sorting because Reduce is working fine, as you reported.
CoGroup does also partition and sort data, but compares the elements of two sorted streams.

I would check the following methods:
- extractKeys
- getFlatComparators
- duplicate (duplicate must return a deep copy, esp. of all nested comparators)

Feel free to share your custom TypeInfo, Comparator, and Serializers.

Cheers, Fabian

2015-09-16 10:52 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible for you to post a complete example with data (also possible to include it in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <[hidden email]> wrote:
Dear fellow Flinkers,

I am implementing queries from the XMark (http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.

For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the personid of the person and the personid of the buyer of an auction, so that I can count the number of purchases of a person. I select this personid as key from the custom nested data type in the where and equalTo functions of the coGroup. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype DawnData. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data.
def env = ExecutionEnvironment.getExecutionEnvironment
def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") })
 .equalTo( auction => { auction.select("2/buyer/@person/2") })
 .apply( (personsres, auctionsres, out : Collector[DawnData]) => {
   // my cogroup function here that outputs the name of the person and the number of auctions
  }
}).setParallelism(1)
This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database!

If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-)

Kind regards.

Pieter Hameete







Reply | Threaded
Open this post in threaded view
|

Re: Issue with parallelism when CoGrouping custom nested data tpye

Pieter Hameete
Fantastic Fabian, that was it :-)! I'm glad it wasn't a more severe/tricky programming error though I already spent quite some time wondering about this one.

Have a nice day!

- Pieter



2015-09-16 14:27 GMT+02:00 Fabian Hueske <[hidden email]>:
Sorry, I was thinking too complicated. Forget about the methods I mentioned.

If you are implementing WritableComparable types, you need to override the hashcode() method.
Flink treats WritableComparable types just like Hadoop [1].
DawnData does not implement hashcode() which causes inconsistent hash partitioning.

Please let me know, if that solved your problem.

Cheers, Fabian

2015-09-16 14:12 GMT+02:00 Pieter Hameete <[hidden email]>:
Hi,

I havent been able to find the problem yet, and I dont know exactly how to check the methods you suggested to check earlier (extractKeys, getFlatComparators, duplicate) for the Scala API. Do you have some pointers for me on how I can check these myself?

In my earlier mail I stated that maps, filters and reduces work fine. I found that this was not correct: for my previous queries I have only used maps and filters. I made an extra test and found that indeed the following code using a reduce also generates faulty results when increasing paralellism past 1:
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def test = auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( (groupedauctions, out : Collector[DawnData]) => {
out.collect(new DawnData(groupedauctions.size))
}).setParallelism(1)
test.print
Does this indicate that something else could be wrong with the custom datatype?

You can find the corresponding code and a small dataset at https://github.com/PHameete/dawn-flink in the development branch. It is a Scala Maven project so you should be able to run the main.scala.wis.dawnflink.performance.DawnBenchmarkSuite class out of the box to run the query from my first email. In this class you can also change the query thats being run or run multiple queries. If this does not work please let me know!

Kind regards and cheers again!

- Pieter




2015-09-16 11:24 GMT+02:00 Pieter Hameete <[hidden email]>:
Cheers Till and Fabian for your fast replies, it's much appreciated!

I figured something should be wrong with my data type. I have no doubt the CoGroup works just fine :-) Its pointers what to investigate about my datatype what I am looking for. Initially I had problems with serialization causing strange issues as well, these were resolved after I had rewritten my serialization so I believe that is working OK.

I'll try looking into the data type some more with your tips. If I cant figure it out i'll share the repository with you later today or tomorrow.

Kind regards,

Pieter





2015-09-16 11:02 GMT+02:00 Fabian Hueske <[hidden email]>:
This sound like a problem with your custom type and its (presumably) custom serializers and comparators.

I assume it is not an issue of partitioning or sorting because Reduce is working fine, as you reported.
CoGroup does also partition and sort data, but compares the elements of two sorted streams.

I would check the following methods:
- extractKeys
- getFlatComparators
- duplicate (duplicate must return a deep copy, esp. of all nested comparators)

Feel free to share your custom TypeInfo, Comparator, and Serializers.

Cheers, Fabian

2015-09-16 10:52 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible for you to post a complete example with data (also possible to include it in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <[hidden email]> wrote:
Dear fellow Flinkers,

I am implementing queries from the XMark (http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.

For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the personid of the person and the personid of the buyer of an auction, so that I can count the number of purchases of a person. I select this personid as key from the custom nested data type in the where and equalTo functions of the coGroup. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype DawnData. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data.
def env = ExecutionEnvironment.getExecutionEnvironment
def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") })
 .equalTo( auction => { auction.select("2/buyer/@person/2") })
 .apply( (personsres, auctionsres, out : Collector[DawnData]) => {
   // my cogroup function here that outputs the name of the person and the number of auctions
  }
}).setParallelism(1)
This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database!

If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-)

Kind regards.

Pieter Hameete








Reply | Threaded
Open this post in threaded view
|

Re: Issue with parallelism when CoGrouping custom nested data tpye

Fabian Hueske-2
Cool!

Always happy to help :-)

2015-09-16 14:41 GMT+02:00 Pieter Hameete <[hidden email]>:
Fantastic Fabian, that was it :-)! I'm glad it wasn't a more severe/tricky programming error though I already spent quite some time wondering about this one.

Have a nice day!

- Pieter



2015-09-16 14:27 GMT+02:00 Fabian Hueske <[hidden email]>:
Sorry, I was thinking too complicated. Forget about the methods I mentioned.

If you are implementing WritableComparable types, you need to override the hashcode() method.
Flink treats WritableComparable types just like Hadoop [1].
DawnData does not implement hashcode() which causes inconsistent hash partitioning.

Please let me know, if that solved your problem.

Cheers, Fabian

2015-09-16 14:12 GMT+02:00 Pieter Hameete <[hidden email]>:
Hi,

I havent been able to find the problem yet, and I dont know exactly how to check the methods you suggested to check earlier (extractKeys, getFlatComparators, duplicate) for the Scala API. Do you have some pointers for me on how I can check these myself?

In my earlier mail I stated that maps, filters and reduces work fine. I found that this was not correct: for my previous queries I have only used maps and filters. I made an extra test and found that indeed the following code using a reduce also generates faulty results when increasing paralellism past 1:
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def test = auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( (groupedauctions, out : Collector[DawnData]) => {
out.collect(new DawnData(groupedauctions.size))
}).setParallelism(1)
test.print
Does this indicate that something else could be wrong with the custom datatype?

You can find the corresponding code and a small dataset at https://github.com/PHameete/dawn-flink in the development branch. It is a Scala Maven project so you should be able to run the main.scala.wis.dawnflink.performance.DawnBenchmarkSuite class out of the box to run the query from my first email. In this class you can also change the query thats being run or run multiple queries. If this does not work please let me know!

Kind regards and cheers again!

- Pieter




2015-09-16 11:24 GMT+02:00 Pieter Hameete <[hidden email]>:
Cheers Till and Fabian for your fast replies, it's much appreciated!

I figured something should be wrong with my data type. I have no doubt the CoGroup works just fine :-) Its pointers what to investigate about my datatype what I am looking for. Initially I had problems with serialization causing strange issues as well, these were resolved after I had rewritten my serialization so I believe that is working OK.

I'll try looking into the data type some more with your tips. If I cant figure it out i'll share the repository with you later today or tomorrow.

Kind regards,

Pieter





2015-09-16 11:02 GMT+02:00 Fabian Hueske <[hidden email]>:
This sound like a problem with your custom type and its (presumably) custom serializers and comparators.

I assume it is not an issue of partitioning or sorting because Reduce is working fine, as you reported.
CoGroup does also partition and sort data, but compares the elements of two sorted streams.

I would check the following methods:
- extractKeys
- getFlatComparators
- duplicate (duplicate must return a deep copy, esp. of all nested comparators)

Feel free to share your custom TypeInfo, Comparator, and Serializers.

Cheers, Fabian

2015-09-16 10:52 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible for you to post a complete example with data (also possible to include it in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <[hidden email]> wrote:
Dear fellow Flinkers,

I am implementing queries from the XMark (http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.

For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the personid of the person and the personid of the buyer of an auction, so that I can count the number of purchases of a person. I select this personid as key from the custom nested data type in the where and equalTo functions of the coGroup. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype DawnData. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data.
def env = ExecutionEnvironment.getExecutionEnvironment
def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)
def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)
def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") })
 .equalTo( auction => { auction.select("2/buyer/@person/2") })
 .apply( (personsres, auctionsres, out : Collector[DawnData]) => {
   // my cogroup function here that outputs the name of the person and the number of auctions
  }
}).setParallelism(1)
This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database!

If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-)

Kind regards.

Pieter Hameete