Hi,
Extending the "RichInputFormat" class I could create my own MySQL input. I want to use it for reading data continuously from a table but I observed that the "RichInputFormat" class read all data and finish the job. I guess for reading data continuously I need to extend the "SourceFunction" but I observed that it has only two methods: the run() and the cancel() So I was wondering is it possible to implement a new class to read data from MySQL tables continuously? Like what we can do with Kafka connector Thanks |
Hi Soheil, It's not recommended to implement a streaming source using `InputFormat` (it's mainly used for batch source). To implement a streaming source, `SourceFunction` is recommended. It's clearly written (with examples) in the java docs in `SourceFucntion` how to write a `run` and `cancel` method. You can refer to that to write your own MySQL streaming source. Soheil Pourbafrani <[hidden email]> 于2019年7月16日周二 上午7:29写道:
|
Hi, Soheil As Caizhi said, to create a source that implements `SourceFunction`, you can first take a closer look at the example in javadoc (https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html). Although `InputFormat` is not recommended to implement a streaming source, it can achieve continuous data reading. As for finishing the job after reading all the data, I think it's your implementation problem. In addition, creating a custom source can also implements or extends `RichSourceFunction`, `ParallelSourceFunction`, `RichParallelSourceFunction`, etc. I don't know how you will achieve continuous reading. Maybe you can also look at the implementation of `ContinuousFileMonitoringFunction`: <a href="https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/Continuous File Monitoring Function.java">https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/Continuous File Monitoring Function.java I hope this will help you. Best, Haibo At 2019-07-16 10:12:21, "Caizhi Weng" <[hidden email]> wrote:
|
In reply to this post by Caizhi Weng
Hi Soheil, I assume that you are using `DataStream` API. Please check the document [1] to get more information. Other guys said a lot about this. Regardless the interfaces, I'm just wondering how could you read a Mysql table "continuously"? Kafka can be used as a message queue which is convenient to get the incremental messages. How do you plan to do that based on Mysql table? Through binary log? Caizhi Weng <[hidden email]> 于2019年7月16日周二 上午10:12写道:
|
Free forum by Nabble | Edit this page |