Flink1.9.1 TableFunction Unable to serialize

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink1.9.1 TableFunction Unable to serialize

Polarisary
Hi all
When I use udf, it throws Unable to serialize Exception as follows:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to serialize object 'UserTableFunction' of class ‘....udtf.UserTableFunction'.
at org.apache.flink.table.utils.EncodingUtils.encodeObjectToString(EncodingUtils.java:72)
at org.apache.flink.table.functions.UserDefinedFunction.functionIdentifier(UserDefinedFunction.java:45)
at org.apache.flink.table.planner.codegen.CodeGenUtils$.udfFieldName(CodeGenUtils.scala:715)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:615)


My udf as follows.  
public class UserTableFunction extends TableFunction<Row> {
private static final long serialVersionUID = 1L;
private HikariCPUtils dbUtils = new HikariCPUtils();
protected Connection connection;


protected PreparedStatement preparedStatement = null;
@Override
public void open(FunctionContext context) throws Exception {
connection = dbUtils.getConnection();
}

@Override
public void close() throws Exception {
if (null != connection)
connection.close();
if (null != preparedStatement)
preparedStatement.close();
}

public void eval(long uid, int countryId) {
...

Row row =
new Row(8);
try {
...

collect(row);
}
catch (SQLException e) {
e.printStackTrace();
}
}

@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING);
}
}


Reply | Threaded
Open this post in threaded view
|

Re: Flink1.9.1 TableFunction Unable to serialize

Benchao Li
Hi Polarisary,

The fields of your `UserTableFunction` maybe not serializable like `Connection` and `PreparedStatement`. So you can make them `transient` and let them not participate in the serialization.

Hope this helps.

Polarisary <[hidden email]> 于2019年12月26日周四 下午4:47写道:
Hi all
When I use udf, it throws Unable to serialize Exception as follows:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to serialize object 'UserTableFunction' of class ‘....udtf.UserTableFunction'.
at org.apache.flink.table.utils.EncodingUtils.encodeObjectToString(EncodingUtils.java:72)
at org.apache.flink.table.functions.UserDefinedFunction.functionIdentifier(UserDefinedFunction.java:45)
at org.apache.flink.table.planner.codegen.CodeGenUtils$.udfFieldName(CodeGenUtils.scala:715)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:615)


My udf as follows.  
public class UserTableFunction extends TableFunction<Row> {
private static final long serialVersionUID = 1L;
private HikariCPUtils dbUtils = new HikariCPUtils();
protected Connection connection;


protected PreparedStatement preparedStatement = null;
@Override
public void open(FunctionContext context) throws Exception {
connection = dbUtils.getConnection();
}

@Override
public void close() throws Exception {
if (null != connection)
connection.close();
if (null != preparedStatement)
preparedStatement.close();
}

public void eval(long uid, int countryId) {
...

Row row =
new Row(8);
try {
...

collect(row);
}
catch (SQLException e) {
e.printStackTrace();
}
}

@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING);
}
}




--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]