Hi folks,
on the first view I have a very simple problem. I like to get datasets out of some textfiles in HDFS and send them to a kafka topic. I use the following code to do that: DataStream<String> hdfsDatasource = env.readTextFile("hdfs://" + parameterTool.getRequired("hdfs_env") + "/user/flink/" + parameterTool.getRequired("hdfs_path") + "/"); hdfsDatasource.addSink(new FlinkKafkaProducer08<String>(parameterTool.getRequired("brokerlist"),parameterTool.getRequired("topic"),new SimpleStringSchema())); Everything works fine. But I need a possibility to go recursive through the source folder and find textfiles in subfolders. For my batch routines it work fine with "recursive.file.enumeration", but in the streaming environment it is not possible to give these configuration to the readTextFile method. Can someone give me a hint ? Cheers Dominique |
Hi Dominique, In Flink 1.1 we've reworked the reading of static files in the DataStream API. There is now a method for passing any FileInputFormat: readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo). I guess you can pass a FileInputFormat with the recursive enumeration enabled there.Regards, Robert On Tue, Jul 12, 2016 at 6:30 PM, Dominique Rondé <[hidden email]> wrote: Hi folks, |
Hi, this does not work right now because FileInputFormat does not allow setting the "enumerateNestedFiles" field directly and the Configuration is completely ignored in Flink streaming jobs. Cheers, Aljoscha On Wed, 13 Jul 2016 at 11:06 Robert Metzger <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |