Reading from HBase problem

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading from HBase problem

Hilmi Yildirim
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster
of 13 machines and with nearly 100 million rows. The hbase version is
1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row .
Then, I use groupBy(0).sum(1).writeAsTest. The result should be the
number of rows. But, the result is not correct. I run the job multiple
times and the result flactuates by +-5. I also run the job for a smaller
table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko

Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

rmetzger0
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko


Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Fabian Hueske-2
Hi Hilmi,

I see two possible reasons:

1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
2) The aggregation / count is buggy

Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.

In any case, it would be very good to know your findings. Please keep us updated.

One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().

Best, Fabian

2015-06-08 17:36 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko



Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Hilmi Yildirim
Hi,
Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.

Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.

Best Regards,
Hilmi

Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
Hi Hilmi,

I see two possible reasons:

1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
2) The aggregation / count is buggy

Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.

In any case, it would be very good to know your findings. Please keep us updated.

One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().

Best, Fabian

2015-06-08 17:36 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko



Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Fabian Hueske-2
OK, so the problem seems to be with the HBase InputFormat.

I guess this issue needs a bit of debugging.
We need to check if records are emitted twice (or more often) and if that is the case which records.
Unfortunately, this issue only seems to occur with large tables :-(

Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?

Cheers, Fabian

2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <[hidden email]>:
Hi,
Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.

Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.

Best Regards,
Hilmi


Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
Hi Hilmi,

I see two possible reasons:

1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
2) The aggregation / count is buggy

Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.

In any case, it would be very good to know your findings. Please keep us updated.

One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().

Best, Fabian

2015-06-08 17:36 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko




Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Hilmi Yildirim
Correct.

I also counted the rows with Spark and Hive. Both returned the same value which is nearly 100 mio. rows. But Flink returns 102 mio. rows.

Best Regards,
Hilmi

Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
OK, so the problem seems to be with the HBase InputFormat.

I guess this issue needs a bit of debugging.
We need to check if records are emitted twice (or more often) and if that is the case which records.
Unfortunately, this issue only seems to occur with large tables :-(

Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?

Cheers, Fabian

2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <[hidden email]>:
Hi,
Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.

Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.

Best Regards,
Hilmi


Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
Hi Hilmi,

I see two possible reasons:

1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
2) The aggregation / count is buggy

Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.

In any case, it would be very good to know your findings. Please keep us updated.

One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().

Best, Fabian

2015-06-08 17:36 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko




Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Hilmi Yildirim
I want to add that I run the Flink job on a cluster with 13 machines and each machine has 13 processing slots which results in a total number of processing slots of 169.

Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
Correct.

I also counted the rows with Spark and Hive. Both returned the same value which is nearly 100 mio. rows. But Flink returns 102 mio. rows.

Best Regards,
Hilmi

Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
OK, so the problem seems to be with the HBase InputFormat.

I guess this issue needs a bit of debugging.
We need to check if records are emitted twice (or more often) and if that is the case which records.
Unfortunately, this issue only seems to occur with large tables :-(

Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?

Cheers, Fabian

2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <[hidden email]>:
Hi,
Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.

Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.

Best Regards,
Hilmi


Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
Hi Hilmi,

I see two possible reasons:

1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
2) The aggregation / count is buggy

Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.

In any case, it would be very good to know your findings. Please keep us updated.

One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().

Best, Fabian

2015-06-08 17:36 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko




Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Fabian Hueske-2
Would you mind opening a JIRA for this issue?

-> https://issues.apache.org/jira/browse/FLINK

I can do it as well, but you know all the details.

Thanks, Fabian

2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <[hidden email]>:
I want to add that I run the Flink job on a cluster with 13 machines and each machine has 13 processing slots which results in a total number of processing slots of 169.


Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
Correct.

I also counted the rows with Spark and Hive. Both returned the same value which is nearly 100 mio. rows. But Flink returns 102 mio. rows.

Best Regards,
Hilmi

Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
OK, so the problem seems to be with the HBase InputFormat.

I guess this issue needs a bit of debugging.
We need to check if records are emitted twice (or more often) and if that is the case which records.
Unfortunately, this issue only seems to occur with large tables :-(

Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?

Cheers, Fabian

2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <[hidden email]>:
Hi,
Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.

Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.

Best Regards,
Hilmi


Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
Hi Hilmi,

I see two possible reasons:

1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
2) The aggregation / count is buggy

Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.

In any case, it would be very good to know your findings. Please keep us updated.

One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().

Best, Fabian

2015-06-08 17:36 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko





Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Hilmi Yildirim
Done
https://issues.apache.org/jira/browse/FLINK-2188

Am 09.06.2015 um 11:26 schrieb Fabian Hueske:
Would you mind opening a JIRA for this issue?

-> https://issues.apache.org/jira/browse/FLINK

I can do it as well, but you know all the details.

Thanks, Fabian

2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <[hidden email]>:
I want to add that I run the Flink job on a cluster with 13 machines and each machine has 13 processing slots which results in a total number of processing slots of 169.


Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
Correct.

I also counted the rows with Spark and Hive. Both returned the same value which is nearly 100 mio. rows. But Flink returns 102 mio. rows.

Best Regards,
Hilmi

Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
OK, so the problem seems to be with the HBase InputFormat.

I guess this issue needs a bit of debugging.
We need to check if records are emitted twice (or more often) and if that is the case which records.
Unfortunately, this issue only seems to occur with large tables :-(

Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?

Cheers, Fabian

2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <[hidden email]>:
Hi,
Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.

Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.

Best Regards,
Hilmi


Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
Hi Hilmi,

I see two possible reasons:

1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
2) The aggregation / count is buggy

Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.

In any case, it would be very good to know your findings. Please keep us updated.

One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().

Best, Fabian

2015-06-08 17:36 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko





Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Fabian Hueske-2
Thank you very much!

From: [hidden email]
Sent: ‎Tuesday‎, ‎9‎. ‎June‎, ‎2015 ‎11‎:‎40
To: [hidden email]

Done
https://issues.apache.org/jira/browse/FLINK-2188

Am 09.06.2015 um 11:26 schrieb Fabian Hueske:
Would you mind opening a JIRA for this issue?

-> https://issues.apache.org/jira/browse/FLINK

I can do it as well, but you know all the details.

Thanks, Fabian

2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <[hidden email]>:
I want to add that I run the Flink job on a cluster with 13 machines and each machine has 13 processing slots which results in a total number of processing slots of 169.


Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
Correct.

I also counted the rows with Spark and Hive. Both returned the same value which is nearly 100 mio. rows. But Flink returns 102 mio. rows.

Best Regards,
Hilmi

Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
OK, so the problem seems to be with the HBase InputFormat.

I guess this issue needs a bit of debugging.
We need to check if records are emitted twice (or more often) and if that is the case which records.
Unfortunately, this issue only seems to occur with large tables :-(

Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?

Cheers, Fabian

2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <[hidden email]>:
Hi,
Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.

Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.

Best Regards,
Hilmi


Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
Hi Hilmi,

I see two possible reasons:

1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
2) The aggregation / count is buggy

Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.

In any case, it would be very good to know your findings. Please keep us updated.

One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().

Best, Fabian

2015-06-08 17:36 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Hilmi,

if you just want to count the number of elements, you can also use accumulators, as described here [1].
They are much more lightweight.

So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
Use a long accumulator to count the elements. 

If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)

Best,
Robert



On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <[hidden email]> wrote:
Hi,
I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.

Does anyone know the reason for that?

Best Regards,
Hilmi

--
--
Hilmi Yildirim
Software Developer R&D

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko





Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Ufuk Celebi
Hey Hilmi,

thanks for reporting the issue. Sorry for the inconvenience this has caused. I'm not familiar with HBase in combination with Flink. From what I've seen, there are two options: either use Flink's TableInputFormat from flink-addons or the Hadoop TableInputFormat, right? Which one are you using?

– Ufuk

On 09 Jun 2015, at 11:53, [hidden email] wrote:

> Thank you very much!
>
> From: Hilmi Yildirim
> Sent: ‎Tuesday‎, ‎9‎. ‎June‎, ‎2015 ‎11‎:‎40
> To: [hidden email]
>
> Done
> https://issues.apache.org/jira/browse/FLINK-2188
>
> Am 09.06.2015 um 11:26 schrieb Fabian Hueske:
> Would you mind opening a JIRA for this issue?
>
> -> https://issues.apache.org/jira/browse/FLINK
>
> I can do it as well, but you know all the details.
>
> Thanks, Fabian
>
> 2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <[hidden email]>:
> I want to add that I run the Flink job on a cluster with 13 machines and each machine has 13 processing slots which results in a total number of processing slots of 169.
>
>
> Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
> Correct.
>
> I also counted the rows with Spark and Hive. Both returned the same value which is nearly 100 mio. rows. But Flink returns 102 mio. rows.
>
> Best Regards,
> Hilmi
>
> Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
> OK, so the problem seems to be with the HBase InputFormat.
>
> I guess this issue needs a bit of debugging.
> We need to check if records are emitted twice (or more often) and if that is the case which records.
> Unfortunately, this issue only seems to occur with large tables :-(
>
> Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?
>
> Cheers, Fabian
>
> 2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <[hidden email]>:
> Hi,
> Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.
>
> Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.
>
> Best Regards,
> Hilmi
>
>
> Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
> Hi Hilmi,
>
> I see two possible reasons:
>
> 1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
> 2) The aggregation / count is buggy
>
> Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
> If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.
>
> In any case, it would be very good to know your findings. Please keep us updated.
>
> One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().
>
> Best, Fabian
>
> 2015-06-08 17:36 GMT+02:00 Robert Metzger<[hidden email]>:
> Hi Hilmi,
>
> if you just want to count the number of elements, you can also use accumulators, as described here [1].
> They are much more lightweight.
>
> So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
> Use a long accumulator to count the elements.
>
> If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)
>
> Best,
> Robert
>
>
> [1]: http://slideshare.net/robertmetzger1/apache-flink-hands-on
>
> On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim<[hidden email]> wrote:
> Hi,
> I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
> I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.
>
> Does anyone know the reason for that?
>
> Best Regards,
> Hilmi
>
> --
> --
> Hilmi Yildirim
> Software Developer R&D
>
> http://www.neofonie.de
>
> Besuchen Sie den Neo Tech Blog für Anwender:
> http://blog.neofonie.de/
>
> Folgen Sie uns:
> https://plus.google.com/+neofonie
> http://www.linkedin.com/company/neofonie-gmbh
> https://www.xing.com/companies/neofoniegmbh
>
> Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
> Handelsregister Berlin-Charlottenburg: HRB 67460
> Geschäftsführung: Thomas Kitlitschko

Reply | Threaded
Open this post in threaded view
|

Re: Reading from HBase problem

Hilmi Yildirim
Hi Ufuk,
I used the TableInput format from flink-addons.

Best Regards,
Hilmi

Am 09.06.2015 um 13:17 schrieb Ufuk Celebi:

> Hey Hilmi,
>
> thanks for reporting the issue. Sorry for the inconvenience this has caused. I'm not familiar with HBase in combination with Flink. From what I've seen, there are two options: either use Flink's TableInputFormat from flink-addons or the Hadoop TableInputFormat, right? Which one are you using?
>
> – Ufuk
>
> On 09 Jun 2015, at 11:53, [hidden email] wrote:
>
>> Thank you very much!
>>
>> From: Hilmi Yildirim
>> Sent: ‎Tuesday‎, ‎9‎. ‎June‎, ‎2015 ‎11‎:‎40
>> To: [hidden email]
>>
>> Done
>> https://issues.apache.org/jira/browse/FLINK-2188
>>
>> Am 09.06.2015 um 11:26 schrieb Fabian Hueske:
>> Would you mind opening a JIRA for this issue?
>>
>> -> https://issues.apache.org/jira/browse/FLINK
>>
>> I can do it as well, but you know all the details.
>>
>> Thanks, Fabian
>>
>> 2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <[hidden email]>:
>> I want to add that I run the Flink job on a cluster with 13 machines and each machine has 13 processing slots which results in a total number of processing slots of 169.
>>
>>
>> Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
>> Correct.
>>
>> I also counted the rows with Spark and Hive. Both returned the same value which is nearly 100 mio. rows. But Flink returns 102 mio. rows.
>>
>> Best Regards,
>> Hilmi
>>
>> Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
>> OK, so the problem seems to be with the HBase InputFormat.
>>
>> I guess this issue needs a bit of debugging.
>> We need to check if records are emitted twice (or more often) and if that is the case which records.
>> Unfortunately, this issue only seems to occur with large tables :-(
>>
>> Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?
>>
>> Cheers, Fabian
>>
>> 2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <[hidden email]>:
>> Hi,
>> Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.
>>
>> Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.
>>
>> Best Regards,
>> Hilmi
>>
>>
>> Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
>> Hi Hilmi,
>>
>> I see two possible reasons:
>>
>> 1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
>> 2) The aggregation / count is buggy
>>
>> Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
>> If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.
>>
>> In any case, it would be very good to know your findings. Please keep us updated.
>>
>> One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().
>>
>> Best, Fabian
>>
>> 2015-06-08 17:36 GMT+02:00 Robert Metzger<[hidden email]>:
>> Hi Hilmi,
>>
>> if you just want to count the number of elements, you can also use accumulators, as described here [1].
>> They are much more lightweight.
>>
>> So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
>> Use a long accumulator to count the elements.
>>
>> If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)
>>
>> Best,
>> Robert
>>
>>
>> [1]: http://slideshare.net/robertmetzger1/apache-flink-hands-on
>>
>> On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim<[hidden email]> wrote:
>> Hi,
>> I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
>> I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.
>>
>> Does anyone know the reason for that?
>>
>> Best Regards,
>> Hilmi
>>
>> --
>> --
>> Hilmi Yildirim
>> Software Developer R&D
>>
>> http://www.neofonie.de
>>
>> Besuchen Sie den Neo Tech Blog für Anwender:
>> http://blog.neofonie.de/
>>
>> Folgen Sie uns:
>> https://plus.google.com/+neofonie
>> http://www.linkedin.com/company/neofonie-gmbh
>> https://www.xing.com/companies/neofoniegmbh
>>
>> Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
>> Handelsregister Berlin-Charlottenburg: HRB 67460
>> Geschäftsführung: Thomas Kitlitschko

--
--
Hilmi Yildirim
Software Developer R&D

T: +49 30 24627-281
[hidden email]

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko