hi all, I am facing an odd issue while running a quite complex duplicates detection process.saluti, Stefano Error: org.apache.hadoop.hbase.client.ScannerTimeoutException: 2387347ms passed since the last invocation, timeout is currently set to 900000 at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:352) at org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:106) at org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:48) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hbase.UnknownScannerException: org.apache.hadoop.hbase.UnknownScannerException: Name: 291, already closed? at org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3043) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29497) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2012) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:98) at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.consumerLoop(SimpleRpcScheduler.java:160) at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.access$000(SimpleRpcScheduler.java:38) at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler$1.run(SimpleRpcScheduler.java:110) at java.lang.Thread.run(Thread.java:745) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:283) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:198) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:57) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90) at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:336) ... 5 more Caused by: org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.UnknownScannerException): org.apache.hadoop.hbase.UnknownScannerException: Name: 291, already closed? at org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3043) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29497) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2012) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:98) at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.consumerLoop(SimpleRpcScheduler.java:160) at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.access$000(SimpleRpcScheduler.java:38) at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler$1.run(SimpleRpcScheduler.java:110) at java.lang.Thread.run(Thread.java:745) at org.apache.hadoop.hbase.ipc.RpcClient.call(RpcClient.java:1458) at org.apache.hadoop.hbase.ipc.RpcClient.callBlockingMethod(RpcClient.java:1662) at org.apache.hadoop.hbase.ipc.RpcClient$BlockingRpcChannelImplementation.callBlockingMethod(RpcClient.java:1720) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:29900) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:168) |
Could it be that there are times in the TaskManager where there are large pauses between an inputFormat.nextRecord() and the next one..?
On Thu, Nov 27, 2014 at 3:44 PM, Stefano Bortoli <[hidden email]> wrote:
|
It can be, for several reasons: 1) The reader feeds the data into an operation that cannot always consume data. A sort for example takes records until the buffer is full and the sort is triggered. A join may take some records and then pause until the necessary hash tables are fully built. 2) The JVM pauses with garbage collection. This can in some scenarios be quite a bit (minutes). Stephan On Thu, Nov 27, 2014 at 6:10 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Thanks Stephan for the support! Unfortunately we are not able to understand what lineage of operators cause this problem..
in our case we set the scan timeout to 15 minutes so I think we can exclude garbage collection thus, probably, this is caused by the first option (unfortunately HBase cannot block scans indefinitely..). What can we do to debug this problem? can you give us more detail or links to the internals of such situations? it is not very clear to me the relation between buffers, actions and pauses between two consecutive nextRecord() on the same split of the inputFormat.. |
Hi guys,
sorry for the delay, I meant to catch up on this issue earlier. I’ve two points that I would like to bring up here: 1. (Less important) What causes the delays? I think you should definitely investigate what causes the issue. The quickest thing to do on your own is to try to keep a log of execution times of code that is directly chained to the data source (that would be at least the mapper). I know doing that by hand is tedious, but tools for that won’t be available in the very near future. 2. (Much more important) The delays should not be an issue! HBase can restart a scan at any point within a region at fairly low cost, as long as you know the key from which you want to start reading. So, the idea would be to catch exactly the kinds of timeouts you are experiencing (maybe log a warning) and directly create a new scanner that is configured to start at the position of the last successfully retrieved tuple. This approach means we would need to keep a copy of the key of freshest tuple returned by each scanner in the input format. Of course, that comes with a certain cost, but my guess would be HBase keys usually are not overlay large and performance drop significantly. I have an unstable and outdated implementation of that approach somewhere in an old Stratosphere branch and I could try polish it up so Flavio can try it out. tl;dr If you can’t prevent the timeout, embrace it and simply start a new scan from where you left. Best, Marcus > On 27 Nov 2014, at 20:16, Flavio Pompermaier <[hidden email]> wrote: > > Thanks Stephan for the support! Unfortunately we are not able to understand what lineage of operators cause this problem.. > in our case we set the scan timeout to 15 minutes so I think we can exclude garbage collection thus, probably, this is caused by the first option (unfortunately HBase cannot block scans indefinitely..). > > What can we do to debug this problem? can you give us more detail or links to the internals of such situations? it is not very clear to me the relation between buffers, actions and pauses between two consecutive nextRecord() on the same split of the inputFormat.. |
Hi Marcus! Thanks for your sharing!
About point 1 Stefano logged the execution times of each function and they are in the order of hundreds of ms (ant the scanner timeout was set to 15 minutes)..the only possible cause of this error is that sometime, between 2 consecutive nextRecord() calls occurs some very long delay. In the user code this is something impossible to measure, probably it should be debugged or monitored at the task manager level (correct me if I am wrong), and that is something I hope will be available in the next versions of Flink. About point 2 indeed we implemented a workaround in nextRecord() so that if there's an Exception during next() we recreate a brand new resultScanner and we retry (at least once). You can see the current implementation after my refactoring of the extension at Best, Flavio On Fri, Nov 28, 2014 at 8:33 PM, Marcus Leich <[hidden email]> wrote: Hi guys, |
Free forum by Nabble | Edit this page |