Poor performance with large keys using RocksDB and MapState

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Poor performance with large keys using RocksDB and MapState

ירון שני
Hello,
I have a poor throughput issue, and I think I managed to reproduce it using the following code: 
val conf = new Configuration()
conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 1000))
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 1000))
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))
val be = new RocksDBStateBackend("file:///tmp")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
.setStateBackend(be)

env.setParallelism(3)
env.getConfig.enableObjectReuse()
val r = new scala.util.Random(31)
val randStr = r.nextString(4992)
val s = env.fromElements(1).process((value: Int, ctx: _root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
for (a <- 1 to 1000 * 1000 * 10) {
out.collect( randStr + r.nextString(8) )
      }
}).keyBy(a=>a).process(new ProcessFunction[String, String] {
private var someState: MapState[String, String] = _

override def open(parameters: Configuration): Unit = {
someState = getRuntimeContext.getMapState(
new MapStateDescriptor[String, String]("someState", createTypeInformation[String], createTypeInformation[String])
)
}

override def processElement(value: _root_.scala.Predef.String, ctx: _root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
if(!someState.contains(value)) {
someState.put(value, value)
}
}
})
env.execute()
This has really poor throughput. 
Now changing 
out.collect( randStr + r.nextString(8) )
to
out.collect( r.nextString(8) + randStr)
Solves the issue.
Is there any way easy to fix this? 
I tried to use hash index, but it required rocks db option called "prefix extractor" which I don't know how to fill yet, and no idea if it will fix it. 
If anyone encountered that before, I would really use some advice/help. Thanks!







Reply | Threaded
Open this post in threaded view
|

Re: Poor performance with large keys using RocksDB and MapState

Yun Tang
Reply | Threaded
Open this post in threaded view
|

Re: Poor performance with large keys using RocksDB and MapState

ירון שני
Thanks Yun!,
I used this option, and it greatly helped

val be = new RocksDBStateBackend("file:///tmp")class MyConfig extends DefaultConfigurableOptionsFactory {  override def createColumnOptions(currentOptions: ColumnFamilyOptions, handlesToClose: util.Collection[AutoCloseable]): ColumnFamilyOptions = {
super.createColumnOptions(currentOptions, handlesToClose).optimizeForPointLookup(2000)
}
}
be.setRocksDBOptions(new MyConfig)
be.getMemoryConfiguration.setUseManagedMemory(false)

But now I cant use the RocksDBSharedResources because of setCacheIndexAndFilterBlocks seems to make the hash index not work properly and the performance is bad again. 
Only when using  be.getMemoryConfiguration.setUseManagedMemory(false) and skipping setCacheIndexAndFilterBlocks , only then its working :(





On Fri, Sep 25, 2020 at 9:56 AM Yun Tang <[hidden email]> wrote:
Hi

If you want to improve the performance of point lookup, you could try to use additional hash index. This feature needs to pass a prefix extractor, however, original interface is not exposed out directly in java API.

You could try to call columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb) and it would use NoopTransform prefix extractor by default[1].
Please also consider to use this feature after Flink-1.10.2 due to RocksDB internal bug [2].


Best
Yun Tang



From: ירון שני <[hidden email]>
Sent: Wednesday, September 23, 2020 23:56
To: [hidden email] <[hidden email]>
Subject: Poor performance with large keys using RocksDB and MapState
 
Hello,
I have a poor throughput issue, and I think I managed to reproduce it using the following code: 
val conf = new Configuration()
conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 1000))
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 1000))
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))
val be = new RocksDBStateBackend("file:///tmp")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
.setStateBackend(be)

env.setParallelism(3)
env.getConfig.enableObjectReuse()
val r = new scala.util.Random(31)
val randStr = r.nextString(4992)
val s = env.fromElements(1).process((value: Int, ctx: _root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
for (a <- 1 to 1000 * 1000 * 10) {
out.collect( randStr + r.nextString(8) )
      }
}).keyBy(a=>a).process(new ProcessFunction[String, String] {
private var someState: MapState[String, String] = _

override def open(parameters: Configuration): Unit = {
someState = getRuntimeContext.getMapState(
new MapStateDescriptor[String, String]("someState", createTypeInformation[String], createTypeInformation[String])
)
}

override def processElement(value: _root_.scala.Predef.String, ctx: _root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
if(!someState.contains(value)) {
someState.put(value, value)
}
}
})
env.execute()
This has really poor throughput. 
Now changing 
out.collect( randStr + r.nextString(8) )

to
out.collect( r.nextString(8) + randStr)
Solves the issue.
Is there any way easy to fix this? 
I tried to use hash index, but it required rocks db option called "prefix extractor" which I don't know how to fill yet, and no idea if it will fix it. 
If anyone encountered that before, I would really use some advice/help. Thanks!








Reply | Threaded
Open this post in threaded view
|

Re: Poor performance with large keys using RocksDB and MapState

Yun Tang