Hi,
I am trying to use the InMemoryExternalCatalog to register a table using the Java Table API 1.8 I want to update this table during with another stream that I will be reading. Then I plan to use the values of my InMemoryExternalCatalog to execute other queries. Is that a reasonable plan to execute using Flink Table API? This is my code [1]. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.registerExternalCatalog(inMemoryCatalogHLL, getExternalCatalog()); Table result = tableEnv.scan(inMemoryCatalogHLL, inMemoryTableHLL); result.printSchema(); private ExternalCatalog getExternalCatalog() { // create an in-memory external catalog InMemoryExternalCatalog catalog = new InMemoryExternalCatalog(externalCatalog01); assertTrue(catalog.listTables().isEmpty()); assertTrue(catalog.listSubCatalogs().isEmpty()); // catalog.createSubCatalog(inMemoryCatalogHLL, catalog, false); // catalog.getSubCatalog(inMemoryCatalogHLL); // catalog.getTable(""). catalog.createTable(inMemoryTableHLL, getExternalCatalogTable(), false); // catalog.createTable(inMemoryTableHLL, catalog.getTable(inMemoryTableHLL), // false); return catalog; } private ExternalCatalogTable getExternalCatalogTable() { // @formatter:off FileSystem connectorDescriptor = new FileSystem().path("file:/tmp/file-test.txt"); // FileSystem connectorDescriptor = new FileSystem().path("resources/externalCatalog01.csv"); // FormatDescriptor csvDesc = new Csv().field("a", "string").field("b", // "string").field("c", "string").fieldDelimiter("\t"); // ExternalCatalogTable t1 = // ExternalCatalogTable.builder(connDescIn).withFormat(csvDesc).withSchema(schemaDesc).asTableSource(); // format OldCsv csv = new OldCsv(); csv.field("table", BasicTypeInfo.STRING_TYPE_INFO); csv.field("column", BasicTypeInfo.STRING_TYPE_INFO); csv.field("count", BasicTypeInfo.INT_TYPE_INFO); csv.fieldDelimiter(","); // schema Schema schema = new Schema(); schema.field("table", BasicTypeInfo.STRING_TYPE_INFO); schema.field("column", BasicTypeInfo.STRING_TYPE_INFO); schema.field("count", BasicTypeInfo.INT_TYPE_INFO); return new ExternalCatalogTableBuilder(connectorDescriptor) .withSchema(schema) .withFormat(csv) .asTableSource(); // @formatter:on } I have this error below. Could anyone help please? Thanks Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.path=file:/tmp/file-test.txt connector.property-version=1 connector.type=filesystem format.field-delimiter=, format.fields.0.name=table format.fields.0.type=VARCHAR format.fields.1.name=column format.fields.1.type=VARCHAR format.fields.2.name=count format.fields.2.type=INT format.property-version=1 format.type=csv schema.0.name=table schema.0.type=VARCHAR schema.1.name=column schema.1.type=VARCHAR schema.2.name=count schema.2.type=INT The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214) at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130) at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81) at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49) at org.apache.flink.table.catalog.ExternalTableUtil$.createTableSource(ExternalTableUtil.scala:76) at org.apache.flink.table.catalog.ExternalTableUtil$.fromExternalCatalogTable(ExternalTableUtil.scala:51) at org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:82) at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:288) at org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:648) at org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:664) at org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:621) at org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:620) at org.sense.flink.examples.stream.table.HelloWordExternalCatalogTableAPI.<init>(HelloWordExternalCatalogTableAPI.java:55) at org.sense.flink.examples.stream.table.HelloWordExternalCatalogTableAPI.main(HelloWordExternalCatalogTableAPI.java:32) [1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWordExternalCatalogTableAPI.java |
Free forum by Nabble | Edit this page |