RocksDB error with flink 1.2.0

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

RocksDB error with flink 1.2.0

mclendenin
Starting ~300 CEP patterns with parallelism of 6 since there are 6 partitions on a kafka topic. Checkpoint using rocksDB to Hadoop on interval of 50 seconds. Cluster is  HA with 2 JM and 5 TM. Getting following exception :
 
 
java.io.IOException: Error creating ColumnFamilyHandle.
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:830)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createValueState(RocksDBKeyedStateBackend.java:838)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:251)
        at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
        at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:248)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:557)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:542)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.open(AbstractKeyedCEPPatternOperator.java:102)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.rocksdb.RocksDBException: IO error: /flink/tmp/flink-io-c60bed30-5ca4-4eed-b5b4-14f3c945a46a/job-a150d7d59aafadcf922f2f397c59d6d1_op-KeyedCEPPatternOperator_874_3_uuid-3f3fea55-1af6-43fe-8e20-b213a8e06d28/db/MANIFEST-000006: Too many open files
        at org.rocksdb.RocksDB.createColumnFamily(Native Method)
        at org.rocksdb.RocksDB.createColumnFamily(RocksDB.java:1323)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:823)
        ... 12 more
 
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

Aljoscha Krettek
The problem here is that this will try to open 300 RocksDB instances on each of the TMs (depending on how the parallelism is spread between the machines this could be more or less). As the exception says, this will open too many files because each RocksDB instance has a directory with several files in it.

One possible solution would be to increase the limit on open files but I don’t think that opening 300 RocksDB instances on one machine is a good idea for any size of machine. I think with this many patterns you could start thinking about writing the pattern matching yourself and multiplexing the several patterns in one stateful function or operator.

@Stefan, what do you think about having this many Rocks instances?

Best,
Aljoscha

> On 28. Apr 2017, at 17:05, mclendenin <[hidden email]> wrote:
>
> Starting ~300 CEP patterns with parallelism of 6 since there are 6 partitions
> on a kafka topic. Checkpoint using rocksDB to Hadoop on interval of 50
> seconds. Cluster is  HA with 2 JM and 5 TM. Getting following exception :
>
>
> java.io.IOException: Error creating ColumnFamilyHandle.
>        at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:830)
>        at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createValueState(RocksDBKeyedStateBackend.java:838)
>        at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:251)
>        at
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
>        at
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
>        at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:248)
>        at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:557)
>        at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:542)
>        at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.open(AbstractKeyedCEPPatternOperator.java:102)
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>        at java.lang.Thread.run(Thread.java:745)
> Caused by: org.rocksdb.RocksDBException: IO error:
> /flink/tmp/flink-io-c60bed30-5ca4-4eed-b5b4-14f3c945a46a/job-a150d7d59aafadcf922f2f397c59d6d1_op-KeyedCEPPatternOperator_874_3_uuid-3f3fea55-1af6-43fe-8e20-b213a8e06d28/db/MANIFEST-000006:
> Too many open files
>        at org.rocksdb.RocksDB.createColumnFamily(Native Method)
>        at org.rocksdb.RocksDB.createColumnFamily(RocksDB.java:1323)
>        at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:823)
>        ... 12 more
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

mclendenin
I changed the max number of open files and got past this error but now I'm seeing errors that it's unable to flush the file. I am checkpointing using hdfs, should I be using local file system? 

Is there any better way to use the cep patterns with multiple patterns or are you suggesting creating my own pattern matching?  

On Fri, Apr 28, 2017, 11:44 AM Aljoscha Krettek <[hidden email]> wrote:
The problem here is that this will try to open 300 RocksDB instances on each of the TMs (depending on how the parallelism is spread between the machines this could be more or less). As the exception says, this will open too many files because each RocksDB instance has a directory with several files in it.

One possible solution would be to increase the limit on open files but I don’t think that opening 300 RocksDB instances on one machine is a good idea for any size of machine. I think with this many patterns you could start thinking about writing the pattern matching yourself and multiplexing the several patterns in one stateful function or operator.

@Stefan, what do you think about having this many Rocks instances?

Best,
Aljoscha

> On 28. Apr 2017, at 17:05, mclendenin <[hidden email]> wrote:
>
> Starting ~300 CEP patterns with parallelism of 6 since there are 6 partitions
> on a kafka topic. Checkpoint using rocksDB to Hadoop on interval of 50
> seconds. Cluster is  HA with 2 JM and 5 TM. Getting following exception :
>
>
> java.io.IOException: Error creating ColumnFamilyHandle.
>        at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:830)
>        at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createValueState(RocksDBKeyedStateBackend.java:838)
>        at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:251)
>        at
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
>        at
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
>        at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:248)
>        at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:557)
>        at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:542)
>        at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.open(AbstractKeyedCEPPatternOperator.java:102)
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>        at java.lang.Thread.run(Thread.java:745)
> Caused by: org.rocksdb.RocksDBException: IO error:
> /flink/tmp/flink-io-c60bed30-5ca4-4eed-b5b4-14f3c945a46a/job-a150d7d59aafadcf922f2f397c59d6d1_op-KeyedCEPPatternOperator_874_3_uuid-3f3fea55-1af6-43fe-8e20-b213a8e06d28/db/MANIFEST-000006:
> Too many open files
>        at org.rocksdb.RocksDB.createColumnFamily(Native Method)
>        at org.rocksdb.RocksDB.createColumnFamily(RocksDB.java:1323)
>        at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:823)
>        ... 12 more
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

mclendenin
This is the stacktrace I'm getting when checkpointing to the HDFS. It happens like once every 3 checkpoints and I don't see this without parallelism.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedCEPPatternOperator -> Flat Map -> Map -> Sink: Unnamed (4/6).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedCEPPatternOperator -> Flat Map -> Map -> Sink: Unnamed (4/6).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 in order to obtain the stream state handle
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1010)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:974)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 in order to obtain the stream state handle
                at java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
                at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:79)
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
                ... 7 more
        Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 in order to obtain the stream state handle
                at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
                at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:580)
                at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:410)
                at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:298)
                at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:277)
                at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:37)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
                ... 5 more
        Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 could only be replicated to 0 nodes instead of minReplication (=1).  There are 3 datanode(s) running and no node(s) are excluded in this operation.
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1547)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:724)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

                at org.apache.hadoop.ipc.Client.call(Client.java:1475)
                at org.apache.hadoop.ipc.Client.call(Client.java:1412)
                at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
                at com.sun.proxy.$Proxy12.addBlock(Unknown Source)
                at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
                at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
                at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
                at com.sun.proxy.$Proxy13.addBlock(Unknown Source)
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1459)
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1255)
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
        [CIRCULAR REFERENCE:java.io.IOException: Could not flush and close the file system output stream to hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048 in order to obtain the stream state handle]
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

mclendenin
The top level exception is similar to one on this Jira issue but the root Exception is different. This one says it was fixed in 1.2.0 which is what I'm using

https://issues.apache.org/jira/browse/FLINK-5663 
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

mclendenin
There are only 3 nodes in the HDFS cluster and when running fsck it shows the filesystem as healthy.

$ hdfs fsck /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43/
17/04/28 16:24:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connecting to namenode via http://localhost/fsck?ugi=hadoop&path=%2Fuser%2Fhadoop%2Fflink%2Fcheckpoints%2Fdc2aee563bebce76e420029525c37892%2Fchk-43
FSCK started by hadoop (auth:SIMPLE) from / for path /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43 at Fri Apr 28 16:25:00 EDT 2017
.....Status: HEALTHY
 Total size: 33197 B
 Total dirs: 1
 Total files: 5
 Total symlinks: 0 (Files currently being written: 460)
 Total blocks (validated): 5 (avg. block size 6639 B)
 Minimally replicated blocks: 5 (100.0 %)
 Over-replicated blocks: 0 (0.0 %)
 Under-replicated blocks: 0 (0.0 %)
 Mis-replicated blocks: 0 (0.0 %)
 Default replication factor: 2
 Average block replication: 3.0
 Corrupt blocks: 0
 Missing replicas: 0 (0.0 %)
 Number of data-nodes: 3
 Number of racks: 1
FSCK ended at Fri Apr 28 16:25:00 EDT 2017 in 13 milliseconds


The filesystem under path '/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43' is HEALTHY
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

Aljoscha Krettek
Hi,
I think there the bottleneck might be HDFS. With 300 operators with parallelism 6 you will have 1800 concurrent writes (i.e. connections) to HDFS, which might be to much for the master node and the worker nodes.

This is the same problem that you had on the local filesystem but now in the distributed filesystem.

Best,
Aljoscha

> On 28. Apr 2017, at 22:15, mclendenin <[hidden email]> wrote:
>
> There are only 3 nodes in the HDFS cluster and when running fsck it shows the
> filesystem as healthy.
>
> $ hdfs fsck
> /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43/
> 17/04/28 16:24:59 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Connecting to namenode via
> http://localhost/fsck?ugi=hadoop&path=%2Fuser%2Fhadoop%2Fflink%2Fcheckpoints%2Fdc2aee563bebce76e420029525c37892%2Fchk-43
> FSCK started by hadoop (auth:SIMPLE) from / for path
> /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43 at
> Fri Apr 28 16:25:00 EDT 2017
> .....Status: HEALTHY
> Total size: 33197 B
> Total dirs: 1
> Total files: 5
> Total symlinks: 0 (Files currently being written: 460)
> Total blocks (validated): 5 (avg. block size 6639 B)
> Minimally replicated blocks: 5 (100.0 %)
> Over-replicated blocks: 0 (0.0 %)
> Under-replicated blocks: 0 (0.0 %)
> Mis-replicated blocks: 0 (0.0 %)
> Default replication factor: 2
> Average block replication: 3.0
> Corrupt blocks: 0
> Missing replicas: 0 (0.0 %)
> Number of data-nodes: 3
> Number of racks: 1
> FSCK ended at Fri Apr 28 16:25:00 EDT 2017 in 13 milliseconds
>
>
> The filesystem under path
> '/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43' is
> HEALTHY
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897p12909.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

Elias Levy
In reply to this post by Aljoscha Krettek
Any reason they can't share a single RocksDB state backend instance?


On Fri, Apr 28, 2017 at 8:44 AM, Aljoscha Krettek <[hidden email]> wrote:
The problem here is that this will try to open 300 RocksDB instances on each of the TMs (depending on how the parallelism is spread between the machines this could be more or less). As the exception says, this will open too many files because each RocksDB instance has a directory with several files in it.

One possible solution would be to increase the limit on open files but I don’t think that opening 300 RocksDB instances on one machine is a good idea for any size of machine. I think with this many patterns you could start thinking about writing the pattern matching yourself and multiplexing the several patterns in one stateful function or operator.

@Stefan, what do you think about having this many Rocks instances?


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

Aljoscha Krettek
They can’t (with the current design of Flink) because each CEP pattern get’s executed by a separate operator.

We could think about doing multiplexing of several patterns inside one operator. It’s what I hinted at earlier as a possible solution when I mentioned that you could implement your own operator that keeps track of the patterns and does the pattern matching.

Best,
Aljoscha
On 2. May 2017, at 18:00, Elias Levy <[hidden email]> wrote:

Any reason they can't share a single RocksDB state backend instance?


On Fri, Apr 28, 2017 at 8:44 AM, Aljoscha Krettek <[hidden email]> wrote:
The problem here is that this will try to open 300 RocksDB instances on each of the TMs (depending on how the parallelism is spread between the machines this could be more or less). As the exception says, this will open too many files because each RocksDB instance has a directory with several files in it.

One possible solution would be to increase the limit on open files but I don’t think that opening 300 RocksDB instances on one machine is a good idea for any size of machine. I think with this many patterns you could start thinking about writing the pattern matching yourself and multiplexing the several patterns in one stateful function or operator.

@Stefan, what do you think about having this many Rocks instances?



Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

Stephan Ewen
Multiplexing patterns seems like the right thing to do.

Aside from not sharing rocksdb, having 300 separate operators also results in more threads, network connections, etc. That makes it all less efficient...

On Tue, May 2, 2017 at 6:06 PM, Aljoscha Krettek <[hidden email]> wrote:
They can’t (with the current design of Flink) because each CEP pattern get’s executed by a separate operator.

We could think about doing multiplexing of several patterns inside one operator. It’s what I hinted at earlier as a possible solution when I mentioned that you could implement your own operator that keeps track of the patterns and does the pattern matching.

Best,
Aljoscha

On 2. May 2017, at 18:00, Elias Levy <[hidden email]> wrote:

Any reason they can't share a single RocksDB state backend instance?


On Fri, Apr 28, 2017 at 8:44 AM, Aljoscha Krettek <[hidden email]> wrote:
The problem here is that this will try to open 300 RocksDB instances on each of the TMs (depending on how the parallelism is spread between the machines this could be more or less). As the exception says, this will open too many files because each RocksDB instance has a directory with several files in it.

One possible solution would be to increase the limit on open files but I don’t think that opening 300 RocksDB instances on one machine is a good idea for any size of machine. I think with this many patterns you could start thinking about writing the pattern matching yourself and multiplexing the several patterns in one stateful function or operator.

@Stefan, what do you think about having this many Rocks instances?




Reply | Threaded
Open this post in threaded view
|

Re: RocksDB error with flink 1.2.0

mclendenin
I ended up combining all the patterns into one giant CEP pattern and then filtering the output of that pattern instead. That way it was only one RocksDB instance which led to large checkpoints instead of lots of small checkpoints. This seems to work, but I still need to do more testing around whether it can handle a high volume.