Writing a custom Rocksdb statistics collector

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing a custom Rocksdb statistics collector

Harshvardhan Agrawal
Hi,
I am currently trying to integrate RocksDB statistics in my pipeline. 
The basic idea is that we want to pass RocksDB stats through the same pipeline that is doing our processing and write them to Elasticsearch so that we can visualize them in Kibana.
I have written a custom source function that takes in the DBOptions object from the stream environment and supply it to the source function which then uses this dboptions object to continuously query Rocksdb for metrics. Here's the code:
public class RocksDBStatsStreamRunner {

public static void main(String[] args) throws IOException {

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("/tmp",true);
rocksDBStateBackend.setOptions(new MyOptionsFactory());
streamEnv.setStateBackend(rocksDBStateBackend);

DBOptions dbOptions = ((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
}
}

public RocksDBStatisticsSource(DBOptions dbOptions) {
this(dbOptions, DEFAULT_SLEEP_TIME_MS);
}

public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
this.dbOptions = dbOptions;
this.waitTimeMs = waitTimeMs;
}


@Override
public void stop() {
this.isRunning = false;
}

@Override
public void run(SourceContext sourceContext) throws Exception {
while(isRunning) {
//create rocksdb statistics object
//query rocksdb for statistics using the options field
//sourceContext.collect(rocksdbStats object)
//sleep
}
}

@Override
public void cancel() {
this.isRunning = false;
}
I am assuming that we will get a separate RocksDB options object for each of the slots. Is this a good way to approach this problem? Do you think this will work?

Thanks in advance! :)
--
Regards,
Harshvardhan Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: Writing a custom Rocksdb statistics collector

Yun Tang
Hi Harshvardhan

First of all, 'DBOptions' is not serializable, I think you cannot include it in the source constructor.

I also wondering whether the given `DBOptions` could query RocksDB's statistics since they are not the actual options to open RocksDB.

We have tried to report RocksDB's statistics each time when RocksDB state-backend snapshots, but this solution means you have to modify RocksDB state-backend's source code. By the way, Flink supports to report some native metrics[1], hope this could be helpful.


Best
Yun Tang

From: Harshvardhan Agrawal <[hidden email]>
Sent: Thursday, January 31, 2019 0:23
To: user
Subject: Writing a custom Rocksdb statistics collector
 
Hi,
I am currently trying to integrate RocksDB statistics in my pipeline. 
The basic idea is that we want to pass RocksDB stats through the same pipeline that is doing our processing and write them to Elasticsearch so that we can visualize them in Kibana.
I have written a custom source function that takes in the DBOptions object from the stream environment and supply it to the source function which then uses this dboptions object to continuously query Rocksdb for metrics. Here's the code:
public class RocksDBStatsStreamRunner {

public static void main(String[] args) throws IOException {

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("/tmp",true);
rocksDBStateBackend.setOptions(new MyOptionsFactory());
streamEnv.setStateBackend(rocksDBStateBackend);

DBOptions dbOptions = ((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
}
}

public RocksDBStatisticsSource(DBOptions dbOptions) {
this(dbOptions, DEFAULT_SLEEP_TIME_MS);
}

public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
this.dbOptions = dbOptions;
this.waitTimeMs = waitTimeMs;
}


@Override
public void stop() {
this.isRunning = false;
}

@Override
public void run(SourceContext sourceContext) throws Exception {
while(isRunning) {
//create rocksdb statistics object
//query rocksdb for statistics using the options field
//sourceContext.collect(rocksdbStats object)
//sleep
}
}

@Override
public void cancel() {
this.isRunning = false;
}
I am assuming that we will get a separate RocksDB options object for each of the slots. Is this a good way to approach this problem? Do you think this will work?

Thanks in advance! :)
--
Regards,
Harshvardhan Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: Writing a custom Rocksdb statistics collector

Harshvardhan Agrawal
It looks like the DBOptions that are created by the OptionsFactory class are used for opening RocksDB.

And yes I missed the fact that DBOptions is not serializable. Thanks for pointing that out. I will go through the metrics exposed via Flink. But does this mean that there no good way of getting native RocksDB metrics in Flink?

On Wed, Jan 30, 2019 at 23:07 Yun Tang <[hidden email]> wrote:
Hi Harshvardhan

First of all, 'DBOptions' is not serializable, I think you cannot include it in the source constructor.

I also wondering whether the given `DBOptions` could query RocksDB's statistics since they are not the actual options to open RocksDB.

We have tried to report RocksDB's statistics each time when RocksDB state-backend snapshots, but this solution means you have to modify RocksDB state-backend's source code. By the way, Flink supports to report some native metrics[1], hope this could be helpful.


Best
Yun Tang

From: Harshvardhan Agrawal <[hidden email]>
Sent: Thursday, January 31, 2019 0:23
To: user
Subject: Writing a custom Rocksdb statistics collector
 
Hi,
I am currently trying to integrate RocksDB statistics in my pipeline. 
The basic idea is that we want to pass RocksDB stats through the same pipeline that is doing our processing and write them to Elasticsearch so that we can visualize them in Kibana.
I have written a custom source function that takes in the DBOptions object from the stream environment and supply it to the source function which then uses this dboptions object to continuously query Rocksdb for metrics. Here's the code:
public class RocksDBStatsStreamRunner {

public static void main(String[] args) throws IOException {

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("/tmp",true);
rocksDBStateBackend.setOptions(new MyOptionsFactory());
streamEnv.setStateBackend(rocksDBStateBackend);

DBOptions dbOptions = ((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
}
}

public RocksDBStatisticsSource(DBOptions dbOptions) {
this(dbOptions, DEFAULT_SLEEP_TIME_MS);
}

public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
this.dbOptions = dbOptions;
this.waitTimeMs = waitTimeMs;
}


@Override
public void stop() {
this.isRunning = false;
}

@Override
public void run(SourceContext sourceContext) throws Exception {
while(isRunning) {
//create rocksdb statistics object
//query rocksdb for statistics using the options field
//sourceContext.collect(rocksdbStats object)
//sleep
}
}

@Override
public void cancel() {
this.isRunning = false;
}
I am assuming that we will get a separate RocksDB options object for each of the slots. Is this a good way to approach this problem? Do you think this will work?

Thanks in advance! :)
--
Regards,
Harshvardhan Agrawal
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Writing a custom Rocksdb statistics collector

Yun Tang
DBOptions created by the OptionsFactory would be used to open RocksDB, however, if you just call 'RocksDBStateBackend#getDBOptions()' would not return the exactly wanted DBOptions but a newly created one. The private 'dbOptions' within `RocksDBKeyedStateBackend` is the one you wanted.

Best
Yun Tang

From: Harshvardhan Agrawal <[hidden email]>
Sent: Friday, February 1, 2019 1:35
To: Yun Tang
Cc: user
Subject: Re: Writing a custom Rocksdb statistics collector
 
It looks like the DBOptions that are created by the OptionsFactory class are used for opening RocksDB.

And yes I missed the fact that DBOptions is not serializable. Thanks for pointing that out. I will go through the metrics exposed via Flink. But does this mean that there no good way of getting native RocksDB metrics in Flink?

On Wed, Jan 30, 2019 at 23:07 Yun Tang <[hidden email]> wrote:
Hi Harshvardhan

First of all, 'DBOptions' is not serializable, I think you cannot include it in the source constructor.

I also wondering whether the given `DBOptions` could query RocksDB's statistics since they are not the actual options to open RocksDB.

We have tried to report RocksDB's statistics each time when RocksDB state-backend snapshots, but this solution means you have to modify RocksDB state-backend's source code. By the way, Flink supports to report some native metrics[1], hope this could be helpful.


Best
Yun Tang

From: Harshvardhan Agrawal <[hidden email]>
Sent: Thursday, January 31, 2019 0:23
To: user
Subject: Writing a custom Rocksdb statistics collector
 
Hi,
I am currently trying to integrate RocksDB statistics in my pipeline. 
The basic idea is that we want to pass RocksDB stats through the same pipeline that is doing our processing and write them to Elasticsearch so that we can visualize them in Kibana.
I have written a custom source function that takes in the DBOptions object from the stream environment and supply it to the source function which then uses this dboptions object to continuously query Rocksdb for metrics. Here's the code:
public class RocksDBStatsStreamRunner {

public static void main(String[] args) throws IOException {

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("/tmp",true);
rocksDBStateBackend.setOptions(new MyOptionsFactory());
streamEnv.setStateBackend(rocksDBStateBackend);

DBOptions dbOptions = ((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
}
}

public RocksDBStatisticsSource(DBOptions dbOptions) {
this(dbOptions, DEFAULT_SLEEP_TIME_MS);
}

public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
this.dbOptions = dbOptions;
this.waitTimeMs = waitTimeMs;
}


@Override
public void stop() {
this.isRunning = false;
}

@Override
public void run(SourceContext sourceContext) throws Exception {
while(isRunning) {
//create rocksdb statistics object
//query rocksdb for statistics using the options field
//sourceContext.collect(rocksdbStats object)
//sleep
}
}

@Override
public void cancel() {
this.isRunning = false;
}
I am assuming that we will get a separate RocksDB options object for each of the slots. Is this a good way to approach this problem? Do you think this will work?

Thanks in advance! :)
--
Regards,
Harshvardhan Agrawal
--
Regards,
Harshvardhan