How to use OpenTSDB as Source?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use OpenTSDB as Source?

Lucas Kinne

Hey guys,

in a university project we are storing our collected sensor data in an OpenTSDB database.
I am now trying to use this database as a source in Apache Flink, but I can't seem to figure out how to do it.

I have seen that there is no existing connector for this Database, but I read in the docs that is is possible to implement a custom (Batch/Streaming)TableSource.
There is a Java client for OpenTSDB, which could be used for that.

So I created a new Java Class "OpenTSDBTableSource" that implements "StreamTableSource", "DefinedProctimeAttribute", "DefinedRowtimeAttribute" and "LookupableTableSource", as suggested in the docs.
However, I have no idea how to register this TableSource. The "StreamExecutionEnvironment.addSource" requires a "SourceFunction" parameter instead of my "TableSource" and the "StreamTableEnvironment.registerTableSource"-Method is deprecated. There is a link to the topic "register a TableSource" on linked docs page, but the link seems to be dead, hence I found no other method on how to register a TableSource.

I could also write a "SourceFunction" myself, pull the OpenTSDB database in there and return the DataStream from the fetched Collection, but I am not sure whether this is an efficient way.
And if I did it this "manual" way, how do I avoid pulling the whole database everytime?

Any help is much appreciated, even if it is just a small pointer to the right direction.

Thanks in advance!

Sincerely,
Lucas


0x7306CB12E816710C.asc (4K) Download Attachment
signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to use OpenTSDB as Source?

Marta Paes Moreira
Hi, Lucas.

There was a lot of refactoring in the Table API / SQL in the last release, so the user experience is not ideal at the moment — sorry for that. 

You can try using the DDL syntax to create your table, as shown in [1,2]. I'm CC'ing Timo and Jark, who should be able to help you further.

Marta

[1] https://flink.apache.org/news/2020/02/20/ddl.html

On Tue, Apr 21, 2020 at 7:02 PM Lucas Kinne <[hidden email]> wrote:

Hey guys,

in a university project we are storing our collected sensor data in an OpenTSDB database.
I am now trying to use this database as a source in Apache Flink, but I can't seem to figure out how to do it.

I have seen that there is no existing connector for this Database, but I read in the docs that is is possible to implement a custom (Batch/Streaming)TableSource.
There is a Java client for OpenTSDB, which could be used for that.

So I created a new Java Class "OpenTSDBTableSource" that implements "StreamTableSource", "DefinedProctimeAttribute", "DefinedRowtimeAttribute" and "LookupableTableSource", as suggested in the docs.
However, I have no idea how to register this TableSource. The "StreamExecutionEnvironment.addSource" requires a "SourceFunction" parameter instead of my "TableSource" and the "StreamTableEnvironment.registerTableSource"-Method is deprecated. There is a link to the topic "register a TableSource" on linked docs page, but the link seems to be dead, hence I found no other method on how to register a TableSource.

I could also write a "SourceFunction" myself, pull the OpenTSDB database in there and return the DataStream from the fetched Collection, but I am not sure whether this is an efficient way.
And if I did it this "manual" way, how do I avoid pulling the whole database everytime?

Any help is much appreciated, even if it is just a small pointer to the right direction.

Thanks in advance!

Sincerely,
Lucas

Reply | Threaded
Open this post in threaded view
|

Re: How to use OpenTSDB as Source?

Som Lima
For sake of brevity the code example  does not show the complete code for setting up the environment using EnvironmentSettings class

EnvironmentSettings settings = EnvironmentSettings.newInstance()...

As you can see comparatively the same protocol is not followed when showing setting up the environment.


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
or 

ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
or 

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Is there a complete code somewhere  ?
Please give me link.



On Wed, 22 Apr 2020, 09:36 Marta Paes Moreira, <[hidden email]> wrote:
Hi, Lucas.

There was a lot of refactoring in the Table API / SQL in the last release, so the user experience is not ideal at the moment — sorry for that. 

You can try using the DDL syntax to create your table, as shown in [1,2]. I'm CC'ing Timo and Jark, who should be able to help you further.

Marta

[1] https://flink.apache.org/news/2020/02/20/ddl.html

On Tue, Apr 21, 2020 at 7:02 PM Lucas Kinne <[hidden email]> wrote:

Hey guys,

in a university project we are storing our collected sensor data in an OpenTSDB database.
I am now trying to use this database as a source in Apache Flink, but I can't seem to figure out how to do it.

I have seen that there is no existing connector for this Database, but I read in the docs that is is possible to implement a custom (Batch/Streaming)TableSource.
There is a Java client for OpenTSDB, which could be used for that.

So I created a new Java Class "OpenTSDBTableSource" that implements "StreamTableSource", "DefinedProctimeAttribute", "DefinedRowtimeAttribute" and "LookupableTableSource", as suggested in the docs.
However, I have no idea how to register this TableSource. The "StreamExecutionEnvironment.addSource" requires a "SourceFunction" parameter instead of my "TableSource" and the "StreamTableEnvironment.registerTableSource"-Method is deprecated. There is a link to the topic "register a TableSource" on linked docs page, but the link seems to be dead, hence I found no other method on how to register a TableSource.

I could also write a "SourceFunction" myself, pull the OpenTSDB database in there and return the DataStream from the fetched Collection, but I am not sure whether this is an efficient way.
And if I did it this "manual" way, how do I avoid pulling the whole database everytime?

Any help is much appreciated, even if it is just a small pointer to the right direction.

Thanks in advance!

Sincerely,
Lucas

Reply | Threaded
Open this post in threaded view
|

Re: How to use OpenTSDB as Source?

Jark Wu-3
Hi Som, 

It describe how to create differnet TableEnvironments based on EnvironmentSettings. EnvironmentSettings is a setting to setup a table's environment. 
ExecutionEnvironment is the entry point of DataSet, and StreamExecutionEnvironment is the entry point of DataStream. 
So they have nothing to do with EnvironmentSettings.

Hi Lucas,

I'm sorry that the documentation misses the piece of how to develop connectors for SQL DDL. 
The docs will be refined once the new connector API is ready before 1.11 release. 

If you want to develop a OpenTSDB source for SQL DDL, you should also develop a factory implements TableSourceFactory,
and add the full class path into `META_INF/services/org.apache.flink.table.factories.TableFactory` file to make it can be discovered by framework. 
You can take `KafkaTableSourceSinkFactory` [1] as an example. 

Please let me know if you have other problems. 

Best,
Jark



On Wed, 22 Apr 2020 at 17:51, Som Lima <[hidden email]> wrote:
For sake of brevity the code example  does not show the complete code for setting up the environment using EnvironmentSettings class

EnvironmentSettings settings = EnvironmentSettings.newInstance()...

As you can see comparatively the same protocol is not followed when showing setting up the environment.


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
or 

ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
or 

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Is there a complete code somewhere  ?
Please give me link.



On Wed, 22 Apr 2020, 09:36 Marta Paes Moreira, <[hidden email]> wrote:
Hi, Lucas.

There was a lot of refactoring in the Table API / SQL in the last release, so the user experience is not ideal at the moment — sorry for that. 

You can try using the DDL syntax to create your table, as shown in [1,2]. I'm CC'ing Timo and Jark, who should be able to help you further.

Marta

[1] https://flink.apache.org/news/2020/02/20/ddl.html

On Tue, Apr 21, 2020 at 7:02 PM Lucas Kinne <[hidden email]> wrote:

Hey guys,

in a university project we are storing our collected sensor data in an OpenTSDB database.
I am now trying to use this database as a source in Apache Flink, but I can't seem to figure out how to do it.

I have seen that there is no existing connector for this Database, but I read in the docs that is is possible to implement a custom (Batch/Streaming)TableSource.
There is a Java client for OpenTSDB, which could be used for that.

So I created a new Java Class "OpenTSDBTableSource" that implements "StreamTableSource", "DefinedProctimeAttribute", "DefinedRowtimeAttribute" and "LookupableTableSource", as suggested in the docs.
However, I have no idea how to register this TableSource. The "StreamExecutionEnvironment.addSource" requires a "SourceFunction" parameter instead of my "TableSource" and the "StreamTableEnvironment.registerTableSource"-Method is deprecated. There is a link to the topic "register a TableSource" on linked docs page, but the link seems to be dead, hence I found no other method on how to register a TableSource.

I could also write a "SourceFunction" myself, pull the OpenTSDB database in there and return the DataStream from the fetched Collection, but I am not sure whether this is an efficient way.
And if I did it this "manual" way, how do I avoid pulling the whole database everytime?

Any help is much appreciated, even if it is just a small pointer to the right direction.

Thanks in advance!

Sincerely,
Lucas

Reply | Threaded
Open this post in threaded view
|

Re: How to use OpenTSDB as Source?

Som Lima
Thanks for the  link.


On Wed, 22 Apr 2020, 12:19 Jark Wu, <[hidden email]> wrote:
Hi Som, 

It describe how to create differnet TableEnvironments based on EnvironmentSettings. EnvironmentSettings is a setting to setup a table's environment. 
ExecutionEnvironment is the entry point of DataSet, and StreamExecutionEnvironment is the entry point of DataStream. 
So they have nothing to do with EnvironmentSettings.

Hi Lucas,

I'm sorry that the documentation misses the piece of how to develop connectors for SQL DDL. 
The docs will be refined once the new connector API is ready before 1.11 release. 

If you want to develop a OpenTSDB source for SQL DDL, you should also develop a factory implements TableSourceFactory,
and add the full class path into `META_INF/services/org.apache.flink.table.factories.TableFactory` file to make it can be discovered by framework. 
You can take `KafkaTableSourceSinkFactory` [1] as an example. 

Please let me know if you have other problems. 

Best,
Jark



On Wed, 22 Apr 2020 at 17:51, Som Lima <[hidden email]> wrote:
For sake of brevity the code example  does not show the complete code for setting up the environment using EnvironmentSettings class

EnvironmentSettings settings = EnvironmentSettings.newInstance()...

As you can see comparatively the same protocol is not followed when showing setting up the environment.


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
or 

ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
or 

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Is there a complete code somewhere  ?
Please give me link.



On Wed, 22 Apr 2020, 09:36 Marta Paes Moreira, <[hidden email]> wrote:
Hi, Lucas.

There was a lot of refactoring in the Table API / SQL in the last release, so the user experience is not ideal at the moment — sorry for that. 

You can try using the DDL syntax to create your table, as shown in [1,2]. I'm CC'ing Timo and Jark, who should be able to help you further.

Marta

[1] https://flink.apache.org/news/2020/02/20/ddl.html

On Tue, Apr 21, 2020 at 7:02 PM Lucas Kinne <[hidden email]> wrote:

Hey guys,

in a university project we are storing our collected sensor data in an OpenTSDB database.
I am now trying to use this database as a source in Apache Flink, but I can't seem to figure out how to do it.

I have seen that there is no existing connector for this Database, but I read in the docs that is is possible to implement a custom (Batch/Streaming)TableSource.
There is a Java client for OpenTSDB, which could be used for that.

So I created a new Java Class "OpenTSDBTableSource" that implements "StreamTableSource", "DefinedProctimeAttribute", "DefinedRowtimeAttribute" and "LookupableTableSource", as suggested in the docs.
However, I have no idea how to register this TableSource. The "StreamExecutionEnvironment.addSource" requires a "SourceFunction" parameter instead of my "TableSource" and the "StreamTableEnvironment.registerTableSource"-Method is deprecated. There is a link to the topic "register a TableSource" on linked docs page, but the link seems to be dead, hence I found no other method on how to register a TableSource.

I could also write a "SourceFunction" myself, pull the OpenTSDB database in there and return the DataStream from the fetched Collection, but I am not sure whether this is an efficient way.
And if I did it this "manual" way, how do I avoid pulling the whole database everytime?

Any help is much appreciated, even if it is just a small pointer to the right direction.

Thanks in advance!

Sincerely,
Lucas