Hi guys,
I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put. This is my code using the standard Flink APIs: myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job)); and this is the Exception I get: Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) .... So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument. However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)? Best, Flavio |
To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the
HadoopOutputFormatBase in the open() and finalizeGlobal and call if(this.mapreduceOutputFormat instanceof Configurable){ ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration); } otherwise the "mapred.output.dir" property was always null :( On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Hi Flavio! Is this on Flink 0.9-SNAPSHOT or 0.8.1 ? Stephan On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
|
0.8,1
On Fri, Mar 20, 2015 at 6:11 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi Flavio! The issue that abstract classes and interfaces are not supported is definitely fixed in 0.9. Your other fix (adding the call for configuring the output format) - is that always needed, or just important in a special case? How has the output format worked before? If this is critical to the functionality, would you open a pull request with this patch? Greetings, Stephan On Fri, Mar 20, 2015 at 6:28 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Hi Stephan,
the problem is when you try to write into HBase with the HadoopOutputFormat. Unfortunately the recordWriter of the HBase TableOutputFormat requires a Table object to be instantiated through the setConf() method (otherwise you get a nullPointer), and it sets also other parameters in the passed conf. Thus I think that hadoop OutputFormat implementing Configurable should be initialized somewhere with a call to the setConf() as I tried to do. Moreover there's the problem with the Flink Hadoop OutputFormat that requires a property to be set in the configuration (mapred.output.dir) that is not possible to set right now. Or am I'm doing something wrong? Could you try to write some data to an HBase TableOutputFormat and verify this problem? Thanks again, Flavio On Sat, Mar 21, 2015 at 8:16 PM, Stephan Ewen <[hidden email]> wrote:
|
Any news about this? Could someone look into the problem or should I open a ticket in JIRA?
On Sun, Mar 22, 2015 at 12:09 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Creating a JIRA issue never hurts. Have you tried to add your code snippet to the HadoopOutputFormatBase.configure() method? Seems to me the right place for it.2015-03-23 16:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
|
No I haven't. There are some points that are not clear to me: 1) why the parameters I set in the job configuration get lost when arriving to the job and task managers? Best, |
Hi Flavio, 1) the parameters you set to the configuration object in the main method should be available in the JM and TMs. The OutputFormat object is serialized at the client-side, sent to JM and the TMs, and deserialized. Therefore, all information that was set in the main() should be there in the JM and TMs.2015-03-23 22:19 GMT+01:00 Flavio Pompermaier <[hidden email]>:
|
Free forum by Nabble | Edit this page |