HDFS to Kafka

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

HDFS to Kafka

Dominique Rondé-2
Hi folks,

on the first view I have a very simple problem. I like to get datasets
out of some textfiles in HDFS and send them to a kafka topic. I use the
following code to do that:

DataStream<String> hdfsDatasource = env.readTextFile("hdfs://" +
parameterTool.getRequired("hdfs_env") + "/user/flink/" +
parameterTool.getRequired("hdfs_path") + "/");
hdfsDatasource.addSink(new
FlinkKafkaProducer08<String>(parameterTool.getRequired("brokerlist"),parameterTool.getRequired("topic"),new
SimpleStringSchema()));

Everything works fine. But I need a possibility to go recursive through
the source folder and find textfiles in subfolders. For my batch
routines it work fine with "recursive.file.enumeration", but in the
streaming environment it is not possible to give these configuration to
the readTextFile method.

Can someone give me a hint ?

Cheers

Dominique

Reply | Threaded
Open this post in threaded view
|

Re: HDFS to Kafka

rmetzger0
Hi Dominique,

In Flink 1.1 we've reworked the reading of static files in the DataStream API.
There is now a method for passing any FileInputFormat: readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo).
I guess you can pass a FileInputFormat with the recursive enumeration enabled there.


Regards,
Robert


On Tue, Jul 12, 2016 at 6:30 PM, Dominique Rondé <[hidden email]> wrote:
Hi folks,

on the first view I have a very simple problem. I like to get datasets out of some textfiles in HDFS and send them to a kafka topic. I use the following code to do that:

DataStream<String> hdfsDatasource = env.readTextFile("hdfs://" + parameterTool.getRequired("hdfs_env") + "/user/flink/" + parameterTool.getRequired("hdfs_path") + "/");
hdfsDatasource.addSink(new FlinkKafkaProducer08<String>(parameterTool.getRequired("brokerlist"),parameterTool.getRequired("topic"),new SimpleStringSchema()));

Everything works fine. But I need a possibility to go recursive through the source folder and find textfiles in subfolders. For my batch routines it work fine with "recursive.file.enumeration", but in the streaming environment it is not possible to give these configuration to the readTextFile method.

Can someone give me a hint ?

Cheers

Dominique


Reply | Threaded
Open this post in threaded view
|

Re: HDFS to Kafka

Aljoscha Krettek
Hi,
this does not work right now because FileInputFormat does not allow setting the "enumerateNestedFiles" field directly and the Configuration is completely ignored in Flink streaming jobs.

Cheers,
Aljoscha

On Wed, 13 Jul 2016 at 11:06 Robert Metzger <[hidden email]> wrote:
Hi Dominique,

In Flink 1.1 we've reworked the reading of static files in the DataStream API.
There is now a method for passing any FileInputFormat: readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo).
I guess you can pass a FileInputFormat with the recursive enumeration enabled there.


Regards,
Robert


On Tue, Jul 12, 2016 at 6:30 PM, Dominique Rondé <[hidden email]> wrote:
Hi folks,

on the first view I have a very simple problem. I like to get datasets out of some textfiles in HDFS and send them to a kafka topic. I use the following code to do that:

DataStream<String> hdfsDatasource = env.readTextFile("hdfs://" + parameterTool.getRequired("hdfs_env") + "/user/flink/" + parameterTool.getRequired("hdfs_path") + "/");
hdfsDatasource.addSink(new FlinkKafkaProducer08<String>(parameterTool.getRequired("brokerlist"),parameterTool.getRequired("topic"),new SimpleStringSchema()));

Everything works fine. But I need a possibility to go recursive through the source folder and find textfiles in subfolders. For my batch routines it work fine with "recursive.file.enumeration", but in the streaming environment it is not possible to give these configuration to the readTextFile method.

Can someone give me a hint ?

Cheers

Dominique