DataSet API: HBase ScannerTimeoutException and double Result processing

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

DataSet API: HBase ScannerTimeoutException and double Result processing

Mark Davis
Hello,

I am reading Results from an HBase table and process them with Batch API. Everything works fine until I receive a ScannerTimeoutException from HBase.
Maybe my transformations get stuck or a GC pause happen - hard to tell. The HBase Client restarts the scan and the processing continues.
Except one problem - every time I receive this Exception I observe a duplicate Result processing - the Result which was processed just before ScannerTimeoutException is thrown is processed twice.

Is this expected behavior? Should I be prepared to handle it?
And how should I handle it? Keeping track of all processed Results is not feasible in my case.

Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are set to 60 sec)

Thank you!

Best regards,
Mark


  public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.createInput(new Src())
        .map(new Mapper())
        .print();
  }

  private static class Mapper implements MapFunction<Tuple1<String>, String> {

    private int cnt = 0;

    @Override
    public String map(Tuple1<String> value) throws Exception {
      if (cnt++ % 2 == 0) {
        Thread.sleep(120000);
      }
      return value.f0;
    }

  }

  private static class Src extends AbstractTableInputFormat<Tuple1<String>> {

    @Override
    protected Scan getScanner() {
      Scan scan = new Scan();
      scan.setStartRow(getStartRow());
      scan.setStopRow(getEndRow());
      scan.setCaching(1);
      scan.setCacheBlocks(false);
      return scan;
    }

    @Override
    protected String getTableName() {
      return getTable();
    }

    @Override
    protected Tuple1<String> mapResultToOutType(Result r) {
      return new Tuple1<String>(Bytes.toString(r.getRow()));
    }

    @Override
    public void configure(org.apache.flink.configuration.Configuration parameters) {
      scan = getScanner();
      try {
        table = new HTable(getHadoopConf(), getTableName());
      } catch (IOException e) {
        e.printStackTrace();
      }
    }

  }
Reply | Threaded
Open this post in threaded view
|

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

Flavio Pompermaier
What I can tell is how the HBase input format works..if you look at AbstractTableInputFormat [1] this is the nextRecord() function:
        public T nextRecord(T reuse) throws IOException {
		if (resultScanner == null) {
			throw new IOException("No table result scanner provided!");
		}
		try {
			Result res = resultScanner.next();
			if (res != null) {
				scannedRows++;
				currentRow = res.getRow();
				return mapResultToOutType(res);
			}
		} catch (Exception e) {
			resultScanner.close();
			//workaround for timeout on scan
			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
			scan.setStartRow(currentRow);
			resultScanner = table.getScanner(scan);
			Result res = resultScanner.next();
			if (res != null) {
				scannedRows++;
				currentRow = res.getRow();
				return mapResultToOutType(res);
			}
		}

		endReached = true;
		return null;
	}
When the resultScanner dies because of a timeout (this happens a lot when you have backpressure and the time between 2 consecutive reads exceed the scanner timeout), the code creates a new scanner and restart from where it was (starRow = currentRow).
So there should not be any duplicates (in theory), but this could be the root of the problem..

Best,

On Sat, Nov 23, 2019 at 11:07 PM Mark Davis <[hidden email]> wrote:
Hello,

I am reading Results from an HBase table and process them with Batch API. Everything works fine until I receive a ScannerTimeoutException from HBase.
Maybe my transformations get stuck or a GC pause happen - hard to tell. The HBase Client restarts the scan and the processing continues.
Except one problem - every time I receive this Exception I observe a duplicate Result processing - the Result which was processed just before ScannerTimeoutException is thrown is processed twice.

Is this expected behavior? Should I be prepared to handle it?
And how should I handle it? Keeping track of all processed Results is not feasible in my case.

Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are set to 60 sec)

Thank you!

Best regards,
Mark


  public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.createInput(new Src())
        .map(new Mapper())
        .print();
  }

  private static class Mapper implements MapFunction<Tuple1<String>, String> {

    private int cnt = 0;

    @Override
    public String map(Tuple1<String> value) throws Exception {
      if (cnt++ % 2 == 0) {
        Thread.sleep(120000);
      }
      return value.f0;
    }

  }

  private static class Src extends AbstractTableInputFormat<Tuple1<String>> {

    @Override
    protected Scan getScanner() {
      Scan scan = new Scan();
      scan.setStartRow(getStartRow());
      scan.setStopRow(getEndRow());
      scan.setCaching(1);
      scan.setCacheBlocks(false);
      return scan;
    }

    @Override
    protected String getTableName() {
      return getTable();
    }

    @Override
    protected Tuple1<String> mapResultToOutType(Result r) {
      return new Tuple1<String>(Bytes.toString(r.getRow()));
    }

    @Override
    public void configure(org.apache.flink.configuration.Configuration parameters) {
      scan = getScanner();
      try {
        table = new HTable(getHadoopConf(), getTableName());
      } catch (IOException e) {
        e.printStackTrace();
      }
    }

  }

Reply | Threaded
Open this post in threaded view
|

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

Flavio Pompermaier
Maybe the problem is indeed this..the fact that the scan starts from the last seen row..in this case maybe the first result should be skipped because it was already read..

On Mon, Nov 25, 2019 at 10:22 AM Flavio Pompermaier <[hidden email]> wrote:
What I can tell is how the HBase input format works..if you look at AbstractTableInputFormat [1] this is the nextRecord() function:
        public T nextRecord(T reuse) throws IOException {
		if (resultScanner == null) {
			throw new IOException("No table result scanner provided!");
		}
		try {
			Result res = resultScanner.next();
			if (res != null) {
				scannedRows++;
				currentRow = res.getRow();
				return mapResultToOutType(res);
			}
		} catch (Exception e) {
			resultScanner.close();
			//workaround for timeout on scan
			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
			scan.setStartRow(currentRow);
			resultScanner = table.getScanner(scan);
			Result res = resultScanner.next();
			if (res != null) {
				scannedRows++;
				currentRow = res.getRow();
				return mapResultToOutType(res);
			}
		}

		endReached = true;
		return null;
	}
When the resultScanner dies because of a timeout (this happens a lot when you have backpressure and the time between 2 consecutive reads exceed the scanner timeout), the code creates a new scanner and restart from where it was (starRow = currentRow).
So there should not be any duplicates (in theory), but this could be the root of the problem..

Best,

On Sat, Nov 23, 2019 at 11:07 PM Mark Davis <[hidden email]> wrote:
Hello,

I am reading Results from an HBase table and process them with Batch API. Everything works fine until I receive a ScannerTimeoutException from HBase.
Maybe my transformations get stuck or a GC pause happen - hard to tell. The HBase Client restarts the scan and the processing continues.
Except one problem - every time I receive this Exception I observe a duplicate Result processing - the Result which was processed just before ScannerTimeoutException is thrown is processed twice.

Is this expected behavior? Should I be prepared to handle it?
And how should I handle it? Keeping track of all processed Results is not feasible in my case.

Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are set to 60 sec)

Thank you!

Best regards,
Mark


  public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.createInput(new Src())
        .map(new Mapper())
        .print();
  }

  private static class Mapper implements MapFunction<Tuple1<String>, String> {

    private int cnt = 0;

    @Override
    public String map(Tuple1<String> value) throws Exception {
      if (cnt++ % 2 == 0) {
        Thread.sleep(120000);
      }
      return value.f0;
    }

  }

  private static class Src extends AbstractTableInputFormat<Tuple1<String>> {

    @Override
    protected Scan getScanner() {
      Scan scan = new Scan();
      scan.setStartRow(getStartRow());
      scan.setStopRow(getEndRow());
      scan.setCaching(1);
      scan.setCacheBlocks(false);
      return scan;
    }

    @Override
    protected String getTableName() {
      return getTable();
    }

    @Override
    protected Tuple1<String> mapResultToOutType(Result r) {
      return new Tuple1<String>(Bytes.toString(r.getRow()));
    }

    @Override
    public void configure(org.apache.flink.configuration.Configuration parameters) {
      scan = getScanner();
      try {
        table = new HTable(getHadoopConf(), getTableName());
      } catch (IOException e) {
        e.printStackTrace();
      }
    }

  }


Reply | Threaded
Open this post in threaded view
|

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

Mark Davis
Hi Flavio,


When the resultScanner dies because of a timeout (this happens a lot when you have backpressure and the time between 2 consecutive reads exceed the scanner timeout), the code creates a new scanner and restart from where it was (starRow = currentRow).
So there should not be any duplicates (in theory), but this could be the root of the problem..

Yes, you are right, the nextRecord() exception handling is responsible for the duplicate record processing:

org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed since the last invocation, timeout is currently set to 60000
at org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
at org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.UnknownScannerException: org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already closed?
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)

But I am not sure that the handling of the HBase exception thrown from ClientScanner.next() is correct.
If the call to mapResultToOutType(Result) finished without an error there is no need to restart from the same row.
The new scanner should start from the next row.
Is that so or am I missing something?

Best regards,
  Mark

Reply | Threaded
Open this post in threaded view
|

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

OpenInx
> If the call to mapResultToOutType(Result) finished without an error there is no need to restart from the same row.
> The new scanner should start from the next row.
> Is that so or am I missing something?

Yeah, your are right. I've filed the issue https://issues.apache.org/jira/browse/FLINK-14941 to address this bug.
Thanks.


On Mon, Nov 25, 2019 at 6:57 PM Mark Davis <[hidden email]> wrote:
Hi Flavio,


When the resultScanner dies because of a timeout (this happens a lot when you have backpressure and the time between 2 consecutive reads exceed the scanner timeout), the code creates a new scanner and restart from where it was (starRow = currentRow).
So there should not be any duplicates (in theory), but this could be the root of the problem..

Yes, you are right, the nextRecord() exception handling is responsible for the duplicate record processing:

org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed since the last invocation, timeout is currently set to 60000
at org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
at org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.UnknownScannerException: org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already closed?
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)

But I am not sure that the handling of the HBase exception thrown from ClientScanner.next() is correct.
If the call to mapResultToOutType(Result) finished without an error there is no need to restart from the same row.
The new scanner should start from the next row.
Is that so or am I missing something?

Best regards,
  Mark