I need to bootstrap a keyed process function.
So, I was hoping to use the Table SQL API because I thought it could parallelize the work more efficiently via partitioning.
I need to boot strap keyed state for a keyed process function, with Flnk 1.12.1, thus I think I am required to use the DataSet API.
Is my only option JdbcInputFormat?
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
batchTableEnv.executeSql("
CREATE TABLE my_table (
....
) WITH (
'connector.type' = 'jdbc',
'connector.url' = '?',
'connector.username' = '?',
'connector.password' = '?',
'connector.table' = 'my_table'
)");
Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table");
DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class);
rowDataSet.print();
This ends up throwing this exception:
org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.
at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
at org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)