Hello, I am reading Results from an HBase table and process them with Batch API. Everything works fine until I receive a ScannerTimeoutException from HBase. Maybe my transformations get stuck or a GC pause happen - hard to tell. The HBase Client restarts the scan and the processing continues. Except one problem - every time I receive this Exception I observe a duplicate Result processing - the Result which was processed just before ScannerTimeoutException is thrown is processed twice. Is this expected behavior? Should I be prepared to handle it? And how should I handle it? Keeping track of all processed Results is not feasible in my case. Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are set to 60 sec) Thank you! Best regards, Mark public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.createInput(new Src()) .map(new Mapper()) .print(); } private static class Mapper implements MapFunction<Tuple1<String>, String> { private int cnt = 0; @Override public String map(Tuple1<String> value) throws Exception { if (cnt++ % 2 == 0) { Thread.sleep(120000); } return value.f0; } } private static class Src extends AbstractTableInputFormat<Tuple1<String>> { @Override protected Scan getScanner() { Scan scan = new Scan(); scan.setStartRow(getStartRow()); scan.setStopRow(getEndRow()); scan.setCaching(1); scan.setCacheBlocks(false); return scan; } @Override protected String getTableName() { return getTable(); } @Override protected Tuple1<String> mapResultToOutType(Result r) { return new Tuple1<String>(Bytes.toString(r.getRow())); } @Override public void configure(org.apache.flink.configuration.Configuration parameters) { scan = getScanner(); try { table = new HTable(getHadoopConf(), getTableName()); } catch (IOException e) { e.printStackTrace(); } } }
|
What I can tell is how the HBase input format works..if you look at AbstractTableInputFormat [1] this is the nextRecord() function: public T nextRecord(T reuse) throws IOException { if (resultScanner == null) { throw new IOException("No table result scanner provided!"); } try { Result res = resultScanner.next(); if (res != null) { scannedRows++; currentRow = res.getRow(); return mapResultToOutType(res); } } catch (Exception e) { resultScanner.close(); //workaround for timeout on scan LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e); scan.setStartRow(currentRow); resultScanner = table.getScanner(scan); Result res = resultScanner.next(); if (res != null) { scannedRows++; currentRow = res.getRow(); return mapResultToOutType(res); } } endReached = true; return null; } When the resultScanner dies because of a timeout (this happens a lot when you have backpressure and the time between 2 consecutive reads exceed the scanner timeout), the code creates a new scanner and restart from where it was (starRow = currentRow). So there should not be any duplicates (in theory), but this could be the root of the problem.. Best, On Sat, Nov 23, 2019 at 11:07 PM Mark Davis <[hidden email]> wrote:
|
Maybe the problem is indeed this..the fact that the scan starts from the last seen row..in this case maybe the first result should be skipped because it was already read.. On Mon, Nov 25, 2019 at 10:22 AM Flavio Pompermaier <[hidden email]> wrote:
|
Hi Flavio,
Yes, you are right, the nextRecord() exception handling is responsible for the duplicate record processing: org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed since the last invocation, timeout is currently set to 60000 at org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453) at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371) at org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hbase.UnknownScannerException: org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already closed? at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167) But I am not sure that the handling of the HBase exception thrown from ClientScanner.next() is correct. If the call to mapResultToOutType(Result) finished without an error there is no need to restart from the same row. The new scanner should start from the next row. Is that so or am I missing something? Best regards, Mark |
> If the call to mapResultToOutType(Result) finished without an error there is no need to restart from the same row. > The new scanner should start from the next row. > Is that so or am I missing something? Yeah, your are right. I've filed the issue https://issues.apache.org/jira/browse/FLINK-14941 to address this bug. Thanks. On Mon, Nov 25, 2019 at 6:57 PM Mark Davis <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |