HBase TableOutputFormat

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

HBase TableOutputFormat

Flavio Pompermaier
Hi guys,

I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put.
This is my code using the standard Flink APIs:

myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job));

and this is the Exception I get:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
....

So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument.
However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Flavio Pompermaier
To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the
HadoopOutputFormatBase in the open() and finalizeGlobal and call

if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
}
otherwise the "mapred.output.dir" property was always null :(

On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi guys,

I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put.
This is my code using the standard Flink APIs:

myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job));

and this is the Exception I get:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
....

So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument.
However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Stephan Ewen
Hi Flavio!

Is this on Flink 0.9-SNAPSHOT or 0.8.1 ?

Stephan


On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the
HadoopOutputFormatBase in the open() and finalizeGlobal and call

if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
}
otherwise the "mapred.output.dir" property was always null :(

On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi guys,

I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put.
This is my code using the standard Flink APIs:

myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job));

and this is the Exception I get:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
....

So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument.
However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Flavio Pompermaier
0.8,1

On Fri, Mar 20, 2015 at 6:11 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Is this on Flink 0.9-SNAPSHOT or 0.8.1 ?

Stephan


On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the
HadoopOutputFormatBase in the open() and finalizeGlobal and call

if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
}
otherwise the "mapred.output.dir" property was always null :(

On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi guys,

I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put.
This is my code using the standard Flink APIs:

myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job));

and this is the Exception I get:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
....

So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument.
However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Stephan Ewen
Hi Flavio!

The issue that abstract classes and interfaces are not supported is definitely fixed in 0.9.

Your other fix (adding the call for configuring the output format) - is that always needed, or just important in a special case? How has the output format worked before?

If this is critical to the functionality, would you open a pull request with this patch?

Greetings,
Stephan



On Fri, Mar 20, 2015 at 6:28 PM, Flavio Pompermaier <[hidden email]> wrote:
0.8,1

On Fri, Mar 20, 2015 at 6:11 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Is this on Flink 0.9-SNAPSHOT or 0.8.1 ?

Stephan


On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the
HadoopOutputFormatBase in the open() and finalizeGlobal and call

if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
}
otherwise the "mapred.output.dir" property was always null :(

On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi guys,

I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put.
This is my code using the standard Flink APIs:

myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job));

and this is the Exception I get:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
....

So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument.
However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)?

Best,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Flavio Pompermaier
Hi Stephan,
the problem is when you try to write into HBase with the HadoopOutputFormat.
Unfortunately the recordWriter of the HBase TableOutputFormat requires a Table object to be instantiated through the setConf() method (otherwise you get a nullPointer), and it sets also other parameters in the passed conf. Thus I think that hadoop OutputFormat implementing Configurable should be initialized somewhere with a call to the setConf() as I tried to do.
Moreover there's the problem with the Flink Hadoop OutputFormat that requires a property to be set in the configuration (mapred.output.dir) that is not possible to set right now. Or am I'm doing something wrong? Could you try to write some data to an HBase TableOutputFormat and verify this problem?

Thanks again,
Flavio


On Sat, Mar 21, 2015 at 8:16 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

The issue that abstract classes and interfaces are not supported is definitely fixed in 0.9.

Your other fix (adding the call for configuring the output format) - is that always needed, or just important in a special case? How has the output format worked before?

If this is critical to the functionality, would you open a pull request with this patch?

Greetings,
Stephan



On Fri, Mar 20, 2015 at 6:28 PM, Flavio Pompermaier <[hidden email]> wrote:
0.8,1

On Fri, Mar 20, 2015 at 6:11 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Is this on Flink 0.9-SNAPSHOT or 0.8.1 ?

Stephan


On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the
HadoopOutputFormatBase in the open() and finalizeGlobal and call

if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
}
otherwise the "mapred.output.dir" property was always null :(

On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi guys,

I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put.
This is my code using the standard Flink APIs:

myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job));

and this is the Exception I get:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
....

So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument.
However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)?

Best,
Flavio






Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Flavio Pompermaier
Any news about this? Could someone look into the problem or should I open a ticket in JIRA?

On Sun, Mar 22, 2015 at 12:09 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Stephan,
the problem is when you try to write into HBase with the HadoopOutputFormat.
Unfortunately the recordWriter of the HBase TableOutputFormat requires a Table object to be instantiated through the setConf() method (otherwise you get a nullPointer), and it sets also other parameters in the passed conf. Thus I think that hadoop OutputFormat implementing Configurable should be initialized somewhere with a call to the setConf() as I tried to do.
Moreover there's the problem with the Flink Hadoop OutputFormat that requires a property to be set in the configuration (mapred.output.dir) that is not possible to set right now. Or am I'm doing something wrong? Could you try to write some data to an HBase TableOutputFormat and verify this problem?

Thanks again,
Flavio


On Sat, Mar 21, 2015 at 8:16 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

The issue that abstract classes and interfaces are not supported is definitely fixed in 0.9.

Your other fix (adding the call for configuring the output format) - is that always needed, or just important in a special case? How has the output format worked before?

If this is critical to the functionality, would you open a pull request with this patch?

Greetings,
Stephan



On Fri, Mar 20, 2015 at 6:28 PM, Flavio Pompermaier <[hidden email]> wrote:
0.8,1

On Fri, Mar 20, 2015 at 6:11 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Is this on Flink 0.9-SNAPSHOT or 0.8.1 ?

Stephan


On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the
HadoopOutputFormatBase in the open() and finalizeGlobal and call

if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
}
otherwise the "mapred.output.dir" property was always null :(

On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi guys,

I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put.
This is my code using the standard Flink APIs:

myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job));

and this is the Exception I get:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
....

So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument.
However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)?

Best,
Flavio







Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Fabian Hueske-2
Creating a JIRA issue never hurts.
Have you tried to add your code snippet to the HadoopOutputFormatBase.configure() method? Seems to me the right place for it.

Do you want to open a PR for that?

2015-03-23 16:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any news about this? Could someone look into the problem or should I open a ticket in JIRA?

On Sun, Mar 22, 2015 at 12:09 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Stephan,
the problem is when you try to write into HBase with the HadoopOutputFormat.
Unfortunately the recordWriter of the HBase TableOutputFormat requires a Table object to be instantiated through the setConf() method (otherwise you get a nullPointer), and it sets also other parameters in the passed conf. Thus I think that hadoop OutputFormat implementing Configurable should be initialized somewhere with a call to the setConf() as I tried to do.
Moreover there's the problem with the Flink Hadoop OutputFormat that requires a property to be set in the configuration (mapred.output.dir) that is not possible to set right now. Or am I'm doing something wrong? Could you try to write some data to an HBase TableOutputFormat and verify this problem?

Thanks again,
Flavio


On Sat, Mar 21, 2015 at 8:16 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

The issue that abstract classes and interfaces are not supported is definitely fixed in 0.9.

Your other fix (adding the call for configuring the output format) - is that always needed, or just important in a special case? How has the output format worked before?

If this is critical to the functionality, would you open a pull request with this patch?

Greetings,
Stephan



On Fri, Mar 20, 2015 at 6:28 PM, Flavio Pompermaier <[hidden email]> wrote:
0.8,1

On Fri, Mar 20, 2015 at 6:11 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Is this on Flink 0.9-SNAPSHOT or 0.8.1 ?

Stephan


On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the
HadoopOutputFormatBase in the open() and finalizeGlobal and call

if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
}
otherwise the "mapred.output.dir" property was always null :(

On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi guys,

I was trying to insert into an HBase table with Flink 0.8.1 and it seems to be not possible without creating a custom version of the HBase TableOutputFormat that specialize Mutation with Put.
This is my code using the standard Flink APIs:

myds.output(new HadoopOutputFormat<Text, Put>(new TableOutputFormat<Text>(), job));

and this is the Exception I get:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
....

So I had to copy the TableOutputFormat, rename it as HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type argument.
However the table filed is not initialized because setConf is not called. Is this a bug of the HadoopOutputFormat wrapper that does not check is the outputFormat is an instance of Configurable and call setConf (as it happens for the inputSlit)?

Best,
Flavio








Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Flavio Pompermaier

No I haven't. There are some points that are not clear to me:

1) why the parameters I set in the job configuration get lost when arriving to the job and task managers?
2)do you think I should put the setConf in the configure method?what is the lifecycle of the Outputformat?
3)is it really necessary to set the mapreduce.output.dir?is it a standard approach for Hadoop compatibility?
4)is it better to make a pr fir Flink 0.9 or 0.8.2?when are they going to be released (more or less)?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: HBase TableOutputFormat

Fabian Hueske-2
Hi Flavio,

1) the parameters you set to the configuration object in the main method should be available in the JM and TMs. The OutputFormat object is serialized at the client-side, sent to JM and the TMs, and deserialized. Therefore, all information that was set in the main() should be there in the JM and TMs.
2) configure() is always called before the OutputFormat is used (on JM and TM). This would be the right place to put your code to configure the wrapped Hadoop OF.
3) I think this is only necessary for file-based wrapped Hadoop OFs. The HBase format should not require that.
4) This should be fixed in 0.9 and 0.8.2, IMO.

Cheers, Fabian

2015-03-23 22:19 GMT+01:00 Flavio Pompermaier <[hidden email]>:

No I haven't. There are some points that are not clear to me:

1) why the parameters I set in the job configuration get lost when arriving to the job and task managers?
2)do you think I should put the setConf in the configure method?what is the lifecycle of the Outputformat?
3)is it really necessary to set the mapreduce.output.dir?is it a standard approach for Hadoop compatibility?
4)is it better to make a pr fir Flink 0.9 or 0.8.2?when are they going to be released (more or less)?

Best,
Flavio