Starting ~300 CEP patterns with parallelism of 6 since there are 6 partitions on a kafka topic. Checkpoint using rocksDB to Hadoop on interval of 50 seconds. Cluster is HA with 2 JM and 5 TM. Getting following exception :
java.io.IOException: Error creating ColumnFamilyHandle. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:830) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createValueState(RocksDBKeyedStateBackend.java:838) at org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:251) at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128) at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:248) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:557) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:542) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.open(AbstractKeyedCEPPatternOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: org.rocksdb.RocksDBException: IO error: /flink/tmp/flink-io-c60bed30-5ca4-4eed-b5b4-14f3c945a46a/job-a150d7d59aafadcf922f2f397c59d6d1_op-KeyedCEPPatternOperator_874_3_uuid-3f3fea55-1af6-43fe-8e20-b213a8e06d28/db/MANIFEST-000006: Too many open files at org.rocksdb.RocksDB.createColumnFamily(Native Method) at org.rocksdb.RocksDB.createColumnFamily(RocksDB.java:1323) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:823) ... 12 more |
The problem here is that this will try to open 300 RocksDB instances on each of the TMs (depending on how the parallelism is spread between the machines this could be more or less). As the exception says, this will open too many files because each RocksDB instance has a directory with several files in it.
One possible solution would be to increase the limit on open files but I don’t think that opening 300 RocksDB instances on one machine is a good idea for any size of machine. I think with this many patterns you could start thinking about writing the pattern matching yourself and multiplexing the several patterns in one stateful function or operator. @Stefan, what do you think about having this many Rocks instances? Best, Aljoscha > On 28. Apr 2017, at 17:05, mclendenin <[hidden email]> wrote: > > Starting ~300 CEP patterns with parallelism of 6 since there are 6 partitions > on a kafka topic. Checkpoint using rocksDB to Hadoop on interval of 50 > seconds. Cluster is HA with 2 JM and 5 TM. Getting following exception : > > > java.io.IOException: Error creating ColumnFamilyHandle. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:830) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createValueState(RocksDBKeyedStateBackend.java:838) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:251) > at > org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128) > at > org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:248) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:557) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:542) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.open(AbstractKeyedCEPPatternOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.rocksdb.RocksDBException: IO error: > /flink/tmp/flink-io-c60bed30-5ca4-4eed-b5b4-14f3c945a46a/job-a150d7d59aafadcf922f2f397c59d6d1_op-KeyedCEPPatternOperator_874_3_uuid-3f3fea55-1af6-43fe-8e20-b213a8e06d28/db/MANIFEST-000006: > Too many open files > at org.rocksdb.RocksDB.createColumnFamily(Native Method) > at org.rocksdb.RocksDB.createColumnFamily(RocksDB.java:1323) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:823) > ... 12 more > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
I changed the max number of open files and got past this error but now I'm seeing errors that it's unable to flush the file. I am checkpointing using hdfs, should I be using local file system?
Is there any better way to use the cep patterns with multiple patterns or are you suggesting creating my own pattern matching?
On Fri, Apr 28, 2017, 11:44 AM Aljoscha Krettek <[hidden email]> wrote: The problem here is that this will try to open 300 RocksDB instances on each of the TMs (depending on how the parallelism is spread between the machines this could be more or less). As the exception says, this will open too many files because each RocksDB instance has a directory with several files in it. |
This is the stacktrace I'm getting when checkpointing to the HDFS. It happens like once every 3 checkpoints and I don't see this without parallelism.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedCEPPatternOperator -> Flat Map -> Map -> Sink: Unnamed (4/6).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedCEPPatternOperator -> Flat Map -> Map -> Sink: Unnamed (4/6). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1010) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:974) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:79) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) ... 7 more Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:580) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:410) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:298) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:277) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:37) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915) ... 5 more Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 could only be replicated to 0 nodes instead of minReplication (=1). There are 3 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1547) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:724) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy12.addBlock(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418) at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy13.addBlock(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1459) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1255) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449) [CIRCULAR REFERENCE:java.io.IOException: Could not flush and close the file system output stream to hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 in order to obtain the stream state handle] |
The top level exception is similar to one on this Jira issue but the root Exception is different. This one says it was fixed in 1.2.0 which is what I'm using
https://issues.apache.org/jira/browse/FLINK-5663 |
There are only 3 nodes in the HDFS cluster and when running fsck it shows the filesystem as healthy.
$ hdfs fsck /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43/ 17/04/28 16:24:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Connecting to namenode via http://localhost/fsck?ugi=hadoop&path=%2Fuser%2Fhadoop%2Fflink%2Fcheckpoints%2Fdc2aee563bebce76e420029525c37892%2Fchk-43 FSCK started by hadoop (auth:SIMPLE) from / for path /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43 at Fri Apr 28 16:25:00 EDT 2017 .....Status: HEALTHY Total size: 33197 B Total dirs: 1 Total files: 5 Total symlinks: 0 (Files currently being written: 460) Total blocks (validated): 5 (avg. block size 6639 B) Minimally replicated blocks: 5 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 2 Average block replication: 3.0 Corrupt blocks: 0 Missing replicas: 0 (0.0 %) Number of data-nodes: 3 Number of racks: 1 FSCK ended at Fri Apr 28 16:25:00 EDT 2017 in 13 milliseconds The filesystem under path '/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43' is HEALTHY |
Hi,
I think there the bottleneck might be HDFS. With 300 operators with parallelism 6 you will have 1800 concurrent writes (i.e. connections) to HDFS, which might be to much for the master node and the worker nodes. This is the same problem that you had on the local filesystem but now in the distributed filesystem. Best, Aljoscha > On 28. Apr 2017, at 22:15, mclendenin <[hidden email]> wrote: > > There are only 3 nodes in the HDFS cluster and when running fsck it shows the > filesystem as healthy. > > $ hdfs fsck > /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43/ > 17/04/28 16:24:59 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Connecting to namenode via > http://localhost/fsck?ugi=hadoop&path=%2Fuser%2Fhadoop%2Fflink%2Fcheckpoints%2Fdc2aee563bebce76e420029525c37892%2Fchk-43 > FSCK started by hadoop (auth:SIMPLE) from / for path > /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43 at > Fri Apr 28 16:25:00 EDT 2017 > .....Status: HEALTHY > Total size: 33197 B > Total dirs: 1 > Total files: 5 > Total symlinks: 0 (Files currently being written: 460) > Total blocks (validated): 5 (avg. block size 6639 B) > Minimally replicated blocks: 5 (100.0 %) > Over-replicated blocks: 0 (0.0 %) > Under-replicated blocks: 0 (0.0 %) > Mis-replicated blocks: 0 (0.0 %) > Default replication factor: 2 > Average block replication: 3.0 > Corrupt blocks: 0 > Missing replicas: 0 (0.0 %) > Number of data-nodes: 3 > Number of racks: 1 > FSCK ended at Fri Apr 28 16:25:00 EDT 2017 in 13 milliseconds > > > The filesystem under path > '/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43' is > HEALTHY > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897p12909.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
In reply to this post by Aljoscha Krettek
Any reason they can't share a single RocksDB state backend instance? On Fri, Apr 28, 2017 at 8:44 AM, Aljoscha Krettek <[hidden email]> wrote:
|
They can’t (with the current design of Flink) because each CEP pattern get’s executed by a separate operator.
We could think about doing multiplexing of several patterns inside one operator. It’s what I hinted at earlier as a possible solution when I mentioned that you could implement your own operator that keeps track of the patterns and does the pattern matching. Best, Aljoscha
|
Multiplexing patterns seems like the right thing to do.
Aside from not sharing rocksdb, having 300 separate operators also results in more threads, network connections, etc. That makes it all less efficient... On Tue, May 2, 2017 at 6:06 PM, Aljoscha Krettek <[hidden email]> wrote:
|
I ended up combining all the patterns into one giant CEP pattern and then filtering the output of that pattern instead. That way it was only one RocksDB instance which led to large checkpoints instead of lots of small checkpoints. This seems to work, but I still need to do more testing around whether it can handle a high volume.
|
Free forum by Nabble | Edit this page |