Error when creating InMemoryExternalCatalog to populate using another stream.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Error when creating InMemoryExternalCatalog to populate using another stream.

Felipe Gutierrez
Hi,

I am trying to use the InMemoryExternalCatalog to register a table using the Java Table API 1.8 I want to update this table during with another stream that I will be reading. Then I plan to use the values of my InMemoryExternalCatalog to execute other queries. Is that a reasonable plan to execute using Flink Table API? This is my code [1].

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerExternalCatalog(inMemoryCatalogHLL, getExternalCatalog());
Table result = tableEnv.scan(inMemoryCatalogHLL, inMemoryTableHLL);
result.printSchema();


private ExternalCatalog getExternalCatalog() {
// create an in-memory external catalog
InMemoryExternalCatalog catalog = new InMemoryExternalCatalog(externalCatalog01);
assertTrue(catalog.listTables().isEmpty());
assertTrue(catalog.listSubCatalogs().isEmpty());
// catalog.createSubCatalog(inMemoryCatalogHLL, catalog, false);
// catalog.getSubCatalog(inMemoryCatalogHLL);
// catalog.getTable("").
catalog.createTable(inMemoryTableHLL, getExternalCatalogTable(), false);

// catalog.createTable(inMemoryTableHLL, catalog.getTable(inMemoryTableHLL),
// false);
return catalog;
}

private ExternalCatalogTable getExternalCatalogTable() {
// @formatter:off
FileSystem connectorDescriptor = new FileSystem().path("file:/tmp/file-test.txt");
// FileSystem connectorDescriptor = new FileSystem().path("resources/externalCatalog01.csv");
// FormatDescriptor csvDesc = new Csv().field("a", "string").field("b",
// "string").field("c", "string").fieldDelimiter("\t");
// ExternalCatalogTable t1 =
// ExternalCatalogTable.builder(connDescIn).withFormat(csvDesc).withSchema(schemaDesc).asTableSource();
// format
OldCsv csv = new OldCsv();
csv.field("table", BasicTypeInfo.STRING_TYPE_INFO);
csv.field("column", BasicTypeInfo.STRING_TYPE_INFO);
csv.field("count", BasicTypeInfo.INT_TYPE_INFO);
csv.fieldDelimiter(",");
// schema
Schema schema = new Schema();
schema.field("table", BasicTypeInfo.STRING_TYPE_INFO);
schema.field("column", BasicTypeInfo.STRING_TYPE_INFO);
schema.field("count", BasicTypeInfo.INT_TYPE_INFO);

return new ExternalCatalogTableBuilder(connectorDescriptor)
.withSchema(schema)
.withFormat(csv)
.asTableSource();
// @formatter:on
}

I have this error below. Could anyone help please? Thanks

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.path=file:/tmp/file-test.txt
connector.property-version=1
connector.type=filesystem
format.field-delimiter=,
format.fields.0.name=table
format.fields.0.type=VARCHAR
format.fields.1.name=column
format.fields.1.type=VARCHAR
format.fields.2.name=count
format.fields.2.type=INT
format.property-version=1
format.type=csv
schema.0.name=table
schema.0.type=VARCHAR
schema.1.name=column
schema.1.type=VARCHAR
schema.2.name=count
schema.2.type=INT

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.catalog.ExternalTableUtil$.createTableSource(ExternalTableUtil.scala:76)
at org.apache.flink.table.catalog.ExternalTableUtil$.fromExternalCatalogTable(ExternalTableUtil.scala:51)
at org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:82)
at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:288)
at org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:648)
at org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:664)
at org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:621)
at org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:620)
at org.sense.flink.examples.stream.table.HelloWordExternalCatalogTableAPI.<init>(HelloWordExternalCatalogTableAPI.java:55)
at org.sense.flink.examples.stream.table.HelloWordExternalCatalogTableAPI.main(HelloWordExternalCatalogTableAPI.java:32)

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWordExternalCatalogTableAPI.java
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez