Creating a Source function to read data continuously

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Creating a Source function to read data continuously

Soheil Pourbafrani
Hi,

Extending the "RichInputFormat" class I could create my own MySQL input. I want to use it for reading data continuously from a table but I observed that the "RichInputFormat" class read all data and finish the job.

I guess for reading data continuously I need to extend the "SourceFunction" but I observed that it has only two methods: the run() and the cancel()

So I was wondering is it possible to implement a new class to read data from MySQL tables continuously? Like what we can do with Kafka connector

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Creating a Source function to read data continuously

Caizhi Weng
Hi Soheil,

It's not recommended to implement a streaming source using `InputFormat` (it's mainly used for batch source). To implement a streaming source, `SourceFunction` is recommended.

It's clearly written (with examples) in the java docs in `SourceFucntion` how to write a `run` and `cancel` method. You can refer to that to write your own MySQL streaming source.

Soheil Pourbafrani <[hidden email]> 于2019年7月16日周二 上午7:29写道:
Hi,

Extending the "RichInputFormat" class I could create my own MySQL input. I want to use it for reading data continuously from a table but I observed that the "RichInputFormat" class read all data and finish the job.

I guess for reading data continuously I need to extend the "SourceFunction" but I observed that it has only two methods: the run() and the cancel()

So I was wondering is it possible to implement a new class to read data from MySQL tables continuously? Like what we can do with Kafka connector

Thanks
Reply | Threaded
Open this post in threaded view
|

Re:Re: Creating a Source function to read data continuously

Haibo Sun
Hi, Soheil

As Caizhi said, to create a source that implements `SourceFunction`, you can first take a closer look at the example in javadoc (https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html). Although `InputFormat` is not recommended to implement a streaming source, it can achieve continuous data reading. As for finishing the job after reading all the data, I think it's your implementation problem. In addition, creating a custom source can also implements or extends `RichSourceFunction`, `ParallelSourceFunction`, `RichParallelSourceFunction`, etc.

I don't know how you will achieve continuous reading. Maybe you can also look at the implementation of `ContinuousFileMonitoringFunction`: <a href="https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/Continuous File Monitoring Function.java">https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/Continuous File Monitoring Function.java

I hope this will help you.

Best,
Haibo

At 2019-07-16 10:12:21, "Caizhi Weng" <[hidden email]> wrote:
Hi Soheil,

It's not recommended to implement a streaming source using `InputFormat` (it's mainly used for batch source). To implement a streaming source, `SourceFunction` is recommended.

It's clearly written (with examples) in the java docs in `SourceFucntion` how to write a `run` and `cancel` method. You can refer to that to write your own MySQL streaming source.

Soheil Pourbafrani <[hidden email]> 于2019年7月16日周二 上午7:29写道:
Hi,

Extending the "RichInputFormat" class I could create my own MySQL input. I want to use it for reading data continuously from a table but I observed that the "RichInputFormat" class read all data and finish the job.

I guess for reading data continuously I need to extend the "SourceFunction" but I observed that it has only two methods: the run() and the cancel()

So I was wondering is it possible to implement a new class to read data from MySQL tables continuously? Like what we can do with Kafka connector

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Creating a Source function to read data continuously

Biao Liu
In reply to this post by Caizhi Weng
Hi Soheil,

I assume that you are using `DataStream` API. Please check the document [1] to get more information. Other guys said a lot about this.

Regardless the interfaces, I'm just wondering how could you read a Mysql table "continuously"?
Kafka can be used as a message queue which is convenient to get the incremental messages. How do you plan to do that based on Mysql table? Through binary log?



Caizhi Weng <[hidden email]> 于2019年7月16日周二 上午10:12写道:
Hi Soheil,

It's not recommended to implement a streaming source using `InputFormat` (it's mainly used for batch source). To implement a streaming source, `SourceFunction` is recommended.

It's clearly written (with examples) in the java docs in `SourceFucntion` how to write a `run` and `cancel` method. You can refer to that to write your own MySQL streaming source.

Soheil Pourbafrani <[hidden email]> 于2019年7月16日周二 上午7:29写道:
Hi,

Extending the "RichInputFormat" class I could create my own MySQL input. I want to use it for reading data continuously from a table but I observed that the "RichInputFormat" class read all data and finish the job.

I guess for reading data continuously I need to extend the "SourceFunction" but I observed that it has only two methods: the run() and the cancel()

So I was wondering is it possible to implement a new class to read data from MySQL tables continuously? Like what we can do with Kafka connector

Thanks