Hi to all, I've tried to register an external catalog and use it with the Table API in Flink 1.6.1. The following (Java) test job cannot write to a sink using insertInto because Flink cannot find the table by id (test.t2). Am I doing something wrong or is this a bug? This is my Java test class: import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.catalog.ExternalCatalogTable; import org.apache.flink.table.catalog.InMemoryExternalCatalog; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.FormatDescriptor; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.sinks.CsvTableSink; public class CatalogExperiment { public static void main(String[] args) throws Exception { // create an external catalog final String outPath = "file:/tmp/file2.txt"; InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test"); FileSystem connDescIn = new FileSystem().path("file:/tmp/file-test.txt"); FileSystem connDescOut = new FileSystem().path(outPath); FormatDescriptor csvDesc = new Csv()// .field("a", "string")// .field("b", "string")// .field("c", "string")// .fieldDelimiter("\t"); Schema schemaDesc = new Schema()// .field("a", "string")// .field("b", "string")// .field("c", "string"); ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)// .withFormat(csvDesc)// .withSchema(schemaDesc)// .asTableSource(); ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)// .withFormat(csvDesc)// .withSchema(schemaDesc)// .asTableSink(); catalog.createTable("t1", t1, true); catalog.createTable("t2", t2, true); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env); btEnv.registerExternalCatalog("test", catalog); // this does not work --------------------------------------- btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was registered under the name test.t2 // this works --------------------------------------- btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t", 1, WriteMode.OVERWRITE)); env.execute(); } } Best, Flavio |
Have you tried "t2" instead of "test.t2"? There is a possibility that catalog name isn't part of the table name in the table API. Thanks, Xuefu
|
I've tried with t2, test.t2 and test.test.t2. On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <[hidden email]> wrote:
|
Any other help here? is this a bug or something wrong in my code?
On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <[hidden email]> wrote:
|
Hi Flavio,
the external catalog support is not feature complete yet. I think you can only specify the catalog when reading from a table but `insertInto` does not consider the catalog name. Regards, TImo Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:
|
Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;)
On Thu, Oct 25, 2018 at 2:00 PM Timo Walther <[hidden email]> wrote:
|
IIRC, that was recently fixed. Might come out with 1.6.2 / 1.7.0. Cheers, Fabian Flavio Pompermaier <[hidden email]> schrieb am Do., 25. Okt. 2018, 14:09:
|
Free forum by Nabble | Edit this page |