Re: HDFS to Kafka

Posted by rmetzger0 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/HDFS-to-Kafka-tp7932p7937.html

Hi Dominique,

In Flink 1.1 we've reworked the reading of static files in the DataStream API.
There is now a method for passing any FileInputFormat: readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo).
I guess you can pass a FileInputFormat with the recursive enumeration enabled there.


Regards,
Robert


On Tue, Jul 12, 2016 at 6:30 PM, Dominique Rondé <[hidden email]> wrote:
Hi folks,

on the first view I have a very simple problem. I like to get datasets out of some textfiles in HDFS and send them to a kafka topic. I use the following code to do that:

DataStream<String> hdfsDatasource = env.readTextFile("hdfs://" + parameterTool.getRequired("hdfs_env") + "/user/flink/" + parameterTool.getRequired("hdfs_path") + "/");
hdfsDatasource.addSink(new FlinkKafkaProducer08<String>(parameterTool.getRequired("brokerlist"),parameterTool.getRequired("topic"),new SimpleStringSchema()));

Everything works fine. But I need a possibility to go recursive through the source folder and find textfiles in subfolders. For my batch routines it work fine with "recursive.file.enumeration", but in the streaming environment it is not possible to give these configuration to the readTextFile method.

Can someone give me a hint ?

Cheers

Dominique