http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Reading-from-HBase-problem-tp1545p1566.html
> Hey Hilmi,
>
> thanks for reporting the issue. Sorry for the inconvenience this has caused. I'm not familiar with HBase in combination with Flink. From what I've seen, there are two options: either use Flink's TableInputFormat from flink-addons or the Hadoop TableInputFormat, right? Which one are you using?
>
> – Ufuk
>
> On 09 Jun 2015, at 11:53,
[hidden email] wrote:
>
>> Thank you very much!
>>
>> From: Hilmi Yildirim
>> Sent: Tuesday, 9. June, 2015 11:40
>> To:
[hidden email]
>>
>> Done
>>
https://issues.apache.org/jira/browse/FLINK-2188>>
>> Am 09.06.2015 um 11:26 schrieb Fabian Hueske:
>> Would you mind opening a JIRA for this issue?
>>
>> ->
https://issues.apache.org/jira/browse/FLINK>>
>> I can do it as well, but you know all the details.
>>
>> Thanks, Fabian
>>
>> 2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <
[hidden email]>:
>> I want to add that I run the Flink job on a cluster with 13 machines and each machine has 13 processing slots which results in a total number of processing slots of 169.
>>
>>
>> Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
>> Correct.
>>
>> I also counted the rows with Spark and Hive. Both returned the same value which is nearly 100 mio. rows. But Flink returns 102 mio. rows.
>>
>> Best Regards,
>> Hilmi
>>
>> Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
>> OK, so the problem seems to be with the HBase InputFormat.
>>
>> I guess this issue needs a bit of debugging.
>> We need to check if records are emitted twice (or more often) and if that is the case which records.
>> Unfortunately, this issue only seems to occur with large tables :-(
>>
>> Did I got that right, that the HBase format returns about 2M (~2%) more records than are contained in the HBase table?
>>
>> Cheers, Fabian
>>
>> 2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <
[hidden email]>:
>> Hi,
>> Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1) method.
>>
>> Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains.
>>
>> Best Regards,
>> Hilmi
>>
>>
>> Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
>> Hi Hilmi,
>>
>> I see two possible reasons:
>>
>> 1) The data source / InputFormat is not properly working, so not all HBase records are read/forwarded, or
>> 2) The aggregation / count is buggy
>>
>> Roberts suggestion will use an alternative mechanism to do the count. In fact, you can count with groupBy(0).sum() and accumulators at the same time.
>> If both counts are the same, this will indicate that the aggregation is correct and hint that the HBase format is faulty.
>>
>> In any case, it would be very good to know your findings. Please keep us updated.
>>
>> One more hint, if you want to do a full aggregate, you don't have to use a "dummy" key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without doing the groupBy().
>>
>> Best, Fabian
>>
>> 2015-06-08 17:36 GMT+02:00 Robert Metzger<
[hidden email]>:
>> Hi Hilmi,
>>
>> if you just want to count the number of elements, you can also use accumulators, as described here [1].
>> They are much more lightweight.
>>
>> So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
>> Use a long accumulator to count the elements.
>>
>> If the results with the accumulator are consistent (the exact element count), then there is a severe bug in Flink. But I suspect that the accumulator will give you the same result (off by +-5)
>>
>> Best,
>> Robert
>>
>>
>> [1]:
http://slideshare.net/robertmetzger1/apache-flink-hands-on>>
>> On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim<
[hidden email]> wrote:
>> Hi,
>> I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
>> I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is not correct. I run the job multiple times and the result flactuates by +-5. I also run the job for a smaller table with 100.000 rows and the result is correct.
>>
>> Does anyone know the reason for that?
>>
>> Best Regards,
>> Hilmi
>>
>> --
>> --
>> Hilmi Yildirim
>> Software Developer R&D
>>
>>
http://www.neofonie.de>>
>> Besuchen Sie den Neo Tech Blog für Anwender:
>>
http://blog.neofonie.de/>>
>> Folgen Sie uns:
>>
https://plus.google.com/+neofonie>>
http://www.linkedin.com/company/neofonie-gmbh>>
https://www.xing.com/companies/neofoniegmbh>>
>> Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
>> Handelsregister Berlin-Charlottenburg: HRB 67460
>> Geschäftsführung: Thomas Kitlitschko