no suitable table factory for file sources

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

no suitable table factory for file sources

Günter Hipler-2
Hi,

I can't read from a file source using the sql-client tool.

I just set up a simple test scenario with the configuration file in [1]

I'm getting the error in [2] starting the environment with
bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
in the standard Flink 1.9.1 download environment.

Reading the documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors 
I expected the file system connectors as "built in"

I'm getting a similar issue while following the sql-training tutorial
from https://github.com/ververica/sql-training/wiki. There I changed the
used sql-client-conf.yaml file of the docker container for the sql client.

Thanks for any hints

Günter




[1]

################################################################################
# Define table sources here. See the Table API & SQL documentation for
details.

tables:

   - name: Guenter
     type: source-table
     update-mode: append
     connector:
       type: filesystem
       path: file:///home/swissbib/temp/trash/hello.txt


     format:
       type: csv
       # required: define the schema either by using type information
       schema: "ROW(test STRING)"

       # or use the table's schema
       derive-schema: true

       field-delimiter: ";"         # optional: field delimiter
character (',' by default)
       line-delimiter: "\r\n"       # optional: line delimiter ("\n" by
default; otherwise "\r" or "\r\n" are allowed)
       quote-character: "'"         # optional: quote character for
enclosing field values ('"' by default)
       allow-comments: true         # optional: ignores comment lines
that start with "#" (disabled by default)
       #   if enabled, make sure to also ignore parse errors to allow
empty rows
       ignore-parse-errors: true    # optional: skip fields and rows
with parse errors instead of failing;
       #   fields are set to null in case of errors
       array-element-delimiter: "|" # optional: the array element
delimiter string for separating
       #   array and row element values (";" by default)
       escape-character: "\\"       # optional: escape character for
escaping values (disabled by default)
       null-literal: "n/a"          # optional: null literal string that
is interpreted as a
       #   null value (disabled by default)



#==============================================================================
# Execution properties
#==============================================================================

# Execution properties allow for changing the behavior of a table program.

execution:
   #planner: blink
   type: streaming              # 'batch' or 'streaming' execution
   result-mode: table           # 'changelog' or 'table' presentation of
results
   parallelism: 1               # parallelism of the program
   max-parallelism: 128         # maximum parallelism
   min-idle-state-retention: 0  # minimum idle state retention in ms
   max-idle-state-retention: 0  # maximum idle state retention in ms

#==============================================================================
# Deployment properties
#==============================================================================

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
   type: standalone             # only the 'standalone' deployment is
supported
   response-timeout: 5000       # general cluster communication timeout
in ms
   gateway-address: ""          # (optional) address from cluster to gateway
   gateway-port: 0              # (optional) port from cluster to gateway

[2]

bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
Reading default environment from:
file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml
No session environment specified.
Validating current environment...

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: The configured
environment is invalid. Please check your environment files again.
     at
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
     at
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
     at
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
     at
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
     ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The following properties are requested:
connector.path=file:///home/swissbib/temp/trash/hello.txt
connector.type=filesystem
format.allow-comments=true
format.array-element-delimiter=|
format.derive-schema=true
format.escape-character=\\
format.field-delimiter=;
format.ignore-parse-errors=true
format.line-delimiter=\r\n
format.null-literal=n/a
format.quote-character='
format.schema=ROW(test STRING)
format.type=csv
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
     at
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370)
     at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197)
     at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
     at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
     at
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265)
     at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144)
     at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
     at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142)
     at
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558)
     ... 4 more



Reply | Threaded
Open this post in threaded view
|

Re: no suitable table factory for file sources

Jingsong Li
Hi Günter,

Now File system connector only has OldCsv format [1].
 But your yaml has new csv properties like "format.allow-comments".

You can check Old csv supported properties.


Best,
Jingsong Lee

On Mon, Feb 3, 2020 at 1:00 PM Günter Hipler <[hidden email]> wrote:
Hi,

I can't read from a file source using the sql-client tool.

I just set up a simple test scenario with the configuration file in [1]

I'm getting the error in [2] starting the environment with
bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
in the standard Flink 1.9.1 download environment.

Reading the documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
I expected the file system connectors as "built in"

I'm getting a similar issue while following the sql-training tutorial
from https://github.com/ververica/sql-training/wiki. There I changed the
used sql-client-conf.yaml file of the docker container for the sql client.

Thanks for any hints

Günter




[1]

################################################################################
# Define table sources here. See the Table API & SQL documentation for
details.

tables:

   - name: Guenter
     type: source-table
     update-mode: append
     connector:
       type: filesystem
       path: file:///home/swissbib/temp/trash/hello.txt


     format:
       type: csv
       # required: define the schema either by using type information
       schema: "ROW(test STRING)"

       # or use the table's schema
       derive-schema: true

       field-delimiter: ";"         # optional: field delimiter
character (',' by default)
       line-delimiter: "\r\n"       # optional: line delimiter ("\n" by
default; otherwise "\r" or "\r\n" are allowed)
       quote-character: "'"         # optional: quote character for
enclosing field values ('"' by default)
       allow-comments: true         # optional: ignores comment lines
that start with "#" (disabled by default)
       #   if enabled, make sure to also ignore parse errors to allow
empty rows
       ignore-parse-errors: true    # optional: skip fields and rows
with parse errors instead of failing;
       #   fields are set to null in case of errors
       array-element-delimiter: "|" # optional: the array element
delimiter string for separating
       #   array and row element values (";" by default)
       escape-character: "\\"       # optional: escape character for
escaping values (disabled by default)
       null-literal: "n/a"          # optional: null literal string that
is interpreted as a
       #   null value (disabled by default)



#==============================================================================
# Execution properties
#==============================================================================

# Execution properties allow for changing the behavior of a table program.

execution:
   #planner: blink
   type: streaming              # 'batch' or 'streaming' execution
   result-mode: table           # 'changelog' or 'table' presentation of
results
   parallelism: 1               # parallelism of the program
   max-parallelism: 128         # maximum parallelism
   min-idle-state-retention: 0  # minimum idle state retention in ms
   max-idle-state-retention: 0  # maximum idle state retention in ms

#==============================================================================
# Deployment properties
#==============================================================================

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
   type: standalone             # only the 'standalone' deployment is
supported
   response-timeout: 5000       # general cluster communication timeout
in ms
   gateway-address: ""          # (optional) address from cluster to gateway
   gateway-port: 0              # (optional) port from cluster to gateway

[2]

bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
Reading default environment from:
file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml
No session environment specified.
Validating current environment...

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: The configured
environment is invalid. Please check your environment files again.
     at
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
     at
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
     at
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
     at
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
     ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The following properties are requested:
connector.path=file:///home/swissbib/temp/trash/hello.txt
connector.type=filesystem
format.allow-comments=true
format.array-element-delimiter=|
format.derive-schema=true
format.escape-character=\\
format.field-delimiter=;
format.ignore-parse-errors=true
format.line-delimiter=\r\n
format.null-literal=n/a
format.quote-character='
format.schema=ROW(test STRING)
format.type=csv
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
     at
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370)
     at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197)
     at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
     at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
     at
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265)
     at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144)
     at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
     at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142)
     at
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558)
     ... 4 more





--
Best, Jingsong Lee