The implementation of the RichSinkFunction is not serializable.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

The implementation of the RichSinkFunction is not serializable.

Federico D'Ambrosio
Hi,

I'm trying to write on HBase using writeOutputFormat using a custom HBase format inspired from this example in flink-hbase (mind you, I'm using Scala instead of Java) and encountering the error reported in the mail object.

Now, the OutputFormat I'm using is the following:

abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, confPath : Path) extends OutputFormat[T]{

private val LOG = LoggerFactory.getLogger(this.getClass)

var conf : org.apache.hadoop.conf.Configuration = _
var connection : Connection = _
var table : Table = _
var taskNumber : String = _

@throws[IOException]
def configure(parameters: Configuration): Unit = {
conf = HBaseConfiguration.create()
conf.addResource(confPath.getPath)
connection = ConnectionFactory.createConnection(conf)
}


@throws[IOException]
def close(): Unit = {
table.close()

}


@throws[IOException]
def open(taskNumber: Int, numTasks: Int): Unit = {
this.taskNumber = String.valueOf(taskNumber)
val admin = connection.getAdmin

if(!admin.tableExists(tableDescriptor.getTableName))
admin.createTable(tableDescriptor)

table = connection.getTable(tableDescriptor.getTableName)

}
}

which is inherited by the actual format used, that implements the writeRecord method


class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)

with BatchContainer being

case class BatchContainer(batch: Iterable[(String, String, String, Int)]) extends Serializable

I'd like to ask you: what needs to be Serializable? As far as I see, conf, connection and table are not Serializable and so they are surely part of the issue. Are the constructor parameters, especially tableDescriptor which is not Serializable, to be considered in this case? Should all the methods implemented from the OutputFormat interface contain only Serializable variables?

Thank you for you attention,
Federico
Reply | Threaded
Open this post in threaded view
|

Re: The implementation of the RichSinkFunction is not serializable.

Jörn Franke
It looks like that in your case everything should be serializable. An alternative would be to mark certain non-serializable things as transient, but as far as I see this is not possible in your case.

On 27. Aug 2017, at 11:02, Federico D'Ambrosio <[hidden email]> wrote:

Hi,

I'm trying to write on HBase using writeOutputFormat using a custom HBase format inspired from this example in flink-hbase (mind you, I'm using Scala instead of Java) and encountering the error reported in the mail object.

Now, the OutputFormat I'm using is the following:

abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, confPath : Path) extends OutputFormat[T]{

private val LOG = LoggerFactory.getLogger(this.getClass)

var conf : org.apache.hadoop.conf.Configuration = _
var connection : Connection = _
var table : Table = _
var taskNumber : String = _

@throws[IOException]
def configure(parameters: Configuration): Unit = {
conf = HBaseConfiguration.create()
conf.addResource(confPath.getPath)
connection = ConnectionFactory.createConnection(conf)
}


@throws[IOException]
def close(): Unit = {
table.close()

}


@throws[IOException]
def open(taskNumber: Int, numTasks: Int): Unit = {
this.taskNumber = String.valueOf(taskNumber)
val admin = connection.getAdmin

if(!admin.tableExists(tableDescriptor.getTableName))
admin.createTable(tableDescriptor)

table = connection.getTable(tableDescriptor.getTableName)

}
}

which is inherited by the actual format used, that implements the writeRecord method


class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)

with BatchContainer being

case class BatchContainer(batch: Iterable[(String, String, String, Int)]) extends Serializable

I'd like to ask you: what needs to be Serializable? As far as I see, conf, connection and table are not Serializable and so they are surely part of the issue. Are the constructor parameters, especially tableDescriptor which is not Serializable, to be considered in this case? Should all the methods implemented from the OutputFormat interface contain only Serializable variables?

Thank you for you attention,
Federico
Reply | Threaded
Open this post in threaded view
|

Re: The implementation of the RichSinkFunction is not serializable.

Federico D'Ambrosio
Hi,

could you elaborate, please? Marking conf, connection and table as transient wouldn't help because of the presence of the HTableDescriptor reference?

2017-08-27 12:44 GMT+02:00 Jörn Franke <[hidden email]>:
It looks like that in your case everything should be serializable. An alternative would be to mark certain non-serializable things as transient, but as far as I see this is not possible in your case.

On 27. Aug 2017, at 11:02, Federico D'Ambrosio <[hidden email]> wrote:

Hi,

I'm trying to write on HBase using writeOutputFormat using a custom HBase format inspired from this example in flink-hbase (mind you, I'm using Scala instead of Java) and encountering the error reported in the mail object.

Now, the OutputFormat I'm using is the following:

abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, confPath : Path) extends OutputFormat[T]{

private val LOG = LoggerFactory.getLogger(this.getClass)

var conf : org.apache.hadoop.conf.Configuration = _
var connection : Connection = _
var table : Table = _
var taskNumber : String = _

@throws[IOException]
def configure(parameters: Configuration): Unit = {
conf = HBaseConfiguration.create()
conf.addResource(confPath.getPath)
connection = ConnectionFactory.createConnection(conf)
}


@throws[IOException]
def close(): Unit = {
table.close()

}


@throws[IOException]
def open(taskNumber: Int, numTasks: Int): Unit = {
this.taskNumber = String.valueOf(taskNumber)
val admin = connection.getAdmin

if(!admin.tableExists(tableDescriptor.getTableName))
admin.createTable(tableDescriptor)

table = connection.getTable(tableDescriptor.getTableName)

}
}

which is inherited by the actual format used, that implements the writeRecord method


class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)

with BatchContainer being

case class BatchContainer(batch: Iterable[(String, String, String, Int)]) extends Serializable

I'd like to ask you: what needs to be Serializable? As far as I see, conf, connection and table are not Serializable and so they are surely part of the issue. Are the constructor parameters, especially tableDescriptor which is not Serializable, to be considered in this case? Should all the methods implemented from the OutputFormat interface contain only Serializable variables?

Thank you for you attention,
Federico

Reply | Threaded
Open this post in threaded view
|

Re: The implementation of the RichSinkFunction is not serializable.

Federico D'Ambrosio
Hello everyone,

I solved my issue by using an Array[Byte] as a parameter, instead of the explicit HTableDescriptor parameter. This way I can instantiate the TableDescriptor inside the open method of OutputFormat using the static method HTableDescriptor.parseFrom. In the end, marking conf, table and connection as transient wouldn't make any difference.

Regards

2017-08-27 14:22 GMT+02:00 Federico D'Ambrosio <[hidden email]>:
Hi,

could you elaborate, please? Marking conf, connection and table as transient wouldn't help because of the presence of the HTableDescriptor reference?

2017-08-27 12:44 GMT+02:00 Jörn Franke <[hidden email]>:
It looks like that in your case everything should be serializable. An alternative would be to mark certain non-serializable things as transient, but as far as I see this is not possible in your case.

On 27. Aug 2017, at 11:02, Federico D'Ambrosio <[hidden email]> wrote:

Hi,

I'm trying to write on HBase using writeOutputFormat using a custom HBase format inspired from this example in flink-hbase (mind you, I'm using Scala instead of Java) and encountering the error reported in the mail object.

Now, the OutputFormat I'm using is the following:

abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, confPath : Path) extends OutputFormat[T]{

private val LOG = LoggerFactory.getLogger(this.getClass)

var conf : org.apache.hadoop.conf.Configuration = _
var connection : Connection = _
var table : Table = _
var taskNumber : String = _

@throws[IOException]
def configure(parameters: Configuration): Unit = {
conf = HBaseConfiguration.create()
conf.addResource(confPath.getPath)
connection = ConnectionFactory.createConnection(conf)
}


@throws[IOException]
def close(): Unit = {
table.close()

}


@throws[IOException]
def open(taskNumber: Int, numTasks: Int): Unit = {
this.taskNumber = String.valueOf(taskNumber)
val admin = connection.getAdmin

if(!admin.tableExists(tableDescriptor.getTableName))
admin.createTable(tableDescriptor)

table = connection.getTable(tableDescriptor.getTableName)

}
}

which is inherited by the actual format used, that implements the writeRecord method


class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)

with BatchContainer being

case class BatchContainer(batch: Iterable[(String, String, String, Int)]) extends Serializable

I'd like to ask you: what needs to be Serializable? As far as I see, conf, connection and table are not Serializable and so they are surely part of the issue. Are the constructor parameters, especially tableDescriptor which is not Serializable, to be considered in this case? Should all the methods implemented from the OutputFormat interface contain only Serializable variables?

Thank you for you attention,
Federico