Hello,
Is it possible to use existing Hadoop Input and OutputFormats with Flink? There's a lot of existing code that conforms to these interfaces, seems a shame to have to re-implement it all. Perhaps some adapter shim..? Thanks, Nick |
Hi Nick,
You can use Hadoop Input/Output Format without modification! Please check the documentation[1] in Flink homepage. [1] https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk <[hidden email]> wrote: > > Hello, > > Is it possible to use existing Hadoop Input and OutputFormats with Flink? There's a lot of existing code that conforms to these interfaces, seems a shame to have to re-implement it all. Perhaps some adapter shim..? > > Thanks, > Nick Regards, Chiwan Park |
I completely missed this, thanks Chiwan. Can these be used with DataStreams as well as DataSets? On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park <[hidden email]> wrote: Hi Nick, |
I’m not streaming expert. AFAIK, the layer can be used with only DataSet. There are some streaming-specific features such as distributed snapshot in Flink. These need some supports of source and sink. So you have to implement I/O.
> On Nov 25, 2015, at 3:22 AM, Nick Dimiduk <[hidden email]> wrote: > > I completely missed this, thanks Chiwan. Can these be used with DataStreams as well as DataSets? > > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park <[hidden email]> wrote: > Hi Nick, > > You can use Hadoop Input/Output Format without modification! Please check the documentation[1] in Flink homepage. > > [1] https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html > > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk <[hidden email]> wrote: > > > > Hello, > > > > Is it possible to use existing Hadoop Input and OutputFormats with Flink? There's a lot of existing code that conforms to these interfaces, seems a shame to have to re-implement it all. Perhaps some adapter shim..? > > > > Thanks, > > Nick > > Regards, > Chiwan Park > > Regards, Chiwan Park |
Hi Nick, you can use Flink's HadoopInputFormat wrappers also for the DataStream API. However, DataStream does not offer as much "sugar" as DataSet because StreamEnvironment does not offer dedicated createHadoopInput or readHadoopFile methods.val textData: DataStream[(LongWritable, Text)] = env.createInput( new HadoopInputFormat[LongWritable, Text]( new TextInputFormat, classOf[LongWritable], classOf[Text], new JobConf() )) The Java version is very similar. Note: Flink has wrappers for both MR APIs: mapred and mapreduce. Cheers, Fabian 2015-11-24 19:36 GMT+01:00 Chiwan Park <[hidden email]>: I’m not streaming expert. AFAIK, the layer can be used with only DataSet. There are some streaming-specific features such as distributed snapshot in Flink. These need some supports of source and sink. So you have to implement I/O. |
Guess, it makes sense to add readHadoopXXX() methods to StreamExecutionEnvironment (for feature parity with what's existing presently in ExecutionEnvironment).
Also Flink-2949 addresses the need to add relevant syntactic sugar wrappers in DataSet api for the code snippet in Fabian's previous email. Its not cool, having to instantiate a JobConf in client code and having to pass that around. On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske <[hidden email]> wrote:
|
Thanks for correction @Fabian. :)
> On Nov 25, 2015, at 4:40 AM, Suneel Marthi <[hidden email]> wrote: > > Guess, it makes sense to add readHadoopXXX() methods to StreamExecutionEnvironment (for feature parity with what's existing presently in ExecutionEnvironment). > > Also Flink-2949 addresses the need to add relevant syntactic sugar wrappers in DataSet api for the code snippet in Fabian's previous email. Its not cool, having to instantiate a JobConf in client code and having to pass that around. > > > > On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske <[hidden email]> wrote: > Hi Nick, > > you can use Flink's HadoopInputFormat wrappers also for the DataStream API. However, DataStream does not offer as much "sugar" as DataSet because StreamEnvironment does not offer dedicated createHadoopInput or readHadoopFile methods. > > In DataStream Scala you can read from a Hadoop InputFormat (TextInputFormat in this case) as follows: > > val textData: DataStream[(LongWritable, Text)] = env.createInput( > new HadoopInputFormat[LongWritable, Text]( > new TextInputFormat, > classOf[LongWritable], > classOf[Text], > new JobConf() > )) > > The Java version is very similar. > > Note: Flink has wrappers for both MR APIs: mapred and mapreduce. > > Cheers, > Fabian > > 2015-11-24 19:36 GMT+01:00 Chiwan Park <[hidden email]>: > I’m not streaming expert. AFAIK, the layer can be used with only DataSet. There are some streaming-specific features such as distributed snapshot in Flink. These need some supports of source and sink. So you have to implement I/O. > > > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk <[hidden email]> wrote: > > > > I completely missed this, thanks Chiwan. Can these be used with DataStreams as well as DataSets? > > > > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park <[hidden email]> wrote: > > Hi Nick, > > > > You can use Hadoop Input/Output Format without modification! Please check the documentation[1] in Flink homepage. > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html > > > > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk <[hidden email]> wrote: > > > > > > Hello, > > > > > > Is it possible to use existing Hadoop Input and OutputFormats with Flink? There's a lot of existing code that conforms to these interfaces, seems a shame to have to re-implement it all. Perhaps some adapter shim..? > > > > > > Thanks, > > > Nick > > > > Regards, > > Chiwan Park > > > > > > Regards, > Chiwan Park > Regards, Chiwan Park |
For streaming, I am a bit torn whether reading a file will should have so many such prominent functions. Most streaming programs work on message queues, or on monitored directories. Not saying no, but not sure DataSet/DataStream parity is the main goal - they are for different use cases after all... On Wed, Nov 25, 2015 at 8:22 AM, Chiwan Park <[hidden email]> wrote: Thanks for correction @Fabian. :) |
I agree with Stephan. Reading static files is quite uncommon with the DataStream API. Before We add such a method, we should add a convenience method for Kafka ;) But in general, I'm not a big fan of adding too many of these methods because they pull in so many external classes, which lead to breaking API changes, dependency issues etc. I think such issues can be addressed easily with a good documentation (maybe in the "Best practices" guide), good answers on Stack Overflow and so on. On Wed, Nov 25, 2015 at 12:12 PM, Stephan Ewen <[hidden email]> wrote:
|
Thanks for the comments everyone. For my part, i'm interested most in using Hadoop's OutputFormats for writing out data at the end of a streaming job. I also agree that while these "convenience methods" make for good example code in slide decks, they're often not helpful for "real" applications. The additional maintenance burden of a bloated API tends to be counter-productive. -n On Wed, Nov 25, 2015 at 8:41 AM, Robert Metzger <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |