Hi,
I can't read from a file source using the sql-client tool. I just set up a simple test scenario with the configuration file in [1] I'm getting the error in [2] starting the environment with bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib in the standard Flink 1.9.1 download environment. Reading the documentation https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors I expected the file system connectors as "built in" I'm getting a similar issue while following the sql-training tutorial from https://github.com/ververica/sql-training/wiki. There I changed the used sql-client-conf.yaml file of the docker container for the sql client. Thanks for any hints Günter [1] ################################################################################ # Define table sources here. See the Table API & SQL documentation for details. tables: - name: Guenter type: source-table update-mode: append connector: type: filesystem path: file:///home/swissbib/temp/trash/hello.txt format: type: csv # required: define the schema either by using type information schema: "ROW(test STRING)" # or use the table's schema derive-schema: true field-delimiter: ";" # optional: field delimiter character (',' by default) line-delimiter: "\r\n" # optional: line delimiter ("\n" by default; otherwise "\r" or "\r\n" are allowed) quote-character: "'" # optional: quote character for enclosing field values ('"' by default) allow-comments: true # optional: ignores comment lines that start with "#" (disabled by default) # if enabled, make sure to also ignore parse errors to allow empty rows ignore-parse-errors: true # optional: skip fields and rows with parse errors instead of failing; # fields are set to null in case of errors array-element-delimiter: "|" # optional: the array element delimiter string for separating # array and row element values (";" by default) escape-character: "\\" # optional: escape character for escaping values (disabled by default) null-literal: "n/a" # optional: null literal string that is interpreted as a # null value (disabled by default) #============================================================================== # Execution properties #============================================================================== # Execution properties allow for changing the behavior of a table program. execution: #planner: blink type: streaming # 'batch' or 'streaming' execution result-mode: table # 'changelog' or 'table' presentation of results parallelism: 1 # parallelism of the program max-parallelism: 128 # maximum parallelism min-idle-state-retention: 0 # minimum idle state retention in ms max-idle-state-retention: 0 # maximum idle state retention in ms #============================================================================== # Deployment properties #============================================================================== # Deployment properties allow for describing the cluster to which table # programs are submitted to. deployment: type: standalone # only the 'standalone' deployment is supported response-timeout: 5000 # general cluster communication timeout in ms gateway-address: "" # (optional) address from cluster to gateway gateway-port: 0 # (optional) port from cluster to gateway [2] bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib Reading default environment from: file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml No session environment specified. Validating current environment... Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again. at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562) at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382) at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144) ... 2 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No factory supports all properties. The following properties are requested: connector.path=file:///home/swissbib/temp/trash/hello.txt connector.type=filesystem format.allow-comments=true format.array-element-delimiter=| format.derive-schema=true format.escape-character=\\ format.field-delimiter=; format.ignore-parse-errors=true format.line-delimiter=\r\n format.null-literal=n/a format.quote-character=' format.schema=ROW(test STRING) format.type=csv update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142) at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558) ... 4 more |
Hi Günter, Now File system connector only has OldCsv format [1]. But your yaml has new csv properties like "format.allow-comments". You can check Old csv supported properties. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#file-system-connector Best, Jingsong Lee On Mon, Feb 3, 2020 at 1:00 PM Günter Hipler <[hidden email]> wrote: Hi, Best, Jingsong Lee |
Free forum by Nabble | Edit this page |