Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in the classpath

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in the classpath

Yik San Chan
This question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca.

I am running a PyFlink program that reads from Hive `mysource` table, does some processing, then writes to Hive `mysink` table.

```
hive (aiinfra)> describe mysource;
OK
a                   bigint
b                   bigint

hive (aiinfra)> describe mysink;
OK
c                   bigint
```

This is my tree.

```
.
├── deps
│   ├── flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar
├── hive.py
```

This is the `hive.py`.

```python
import os
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=settings)

t_env.get_config().get_configuration().set_string(
    "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog("myhive", hive_catalog)

# set the HiveCatalog as the current catalog of the session
t_env.use_catalog("myhive")

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

The above program works fine, until I turn the catalog registration logic into SQL.

```python
import os
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=settings)

t_env.get_config().get_configuration().set_string(
    "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

CREATE_CATALOG_DDL = """
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'aiinfra',
    'hive-conf-dir' = '/data/apache/hive/apache-hive-2.1.0-bin/conf'
)
"""

USE_CATALOG_DDL = """
USE CATALOG myhive
"""

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG_DDL)
t_env.execute_sql(USE_CATALOG_DDL)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

Running the latter version `python hive.py` throws exception:

```
Traceback (most recent call last):
  File "/data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py", line 42, in <module>
    t_env.execute_sql(SOURCE_DDL_1)
  File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 766, in execute_sql
  File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
default-database=aiinfra
hive-conf-dir=/data/apache/hive/apache-hive-2.1.0-bin/conf
type=hive

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1078)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:991)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

However, it is clear that HiveCatalogFactory is in the classpath.

```
$ jar tf deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar  | grep CatalogFactory
org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.class
```

I wonder how to fix it? Thanks.

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in the classpath

Chesnay Schepler
Please try copying the connector jar into the lib/ or opt/ directory.

On 3/26/2021 11:59 AM, Yik San Chan wrote:

> This question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca 
> <https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca>.
>
>
> I am running a PyFlink program that reads from Hive `mysource` table,
> does some processing, then writes to Hive `mysink` table.
>
> ```
> hive (aiinfra)> describe mysource;
> OK
> a                   bigint
> b                   bigint
>
> hive (aiinfra)> describe mysink;
> OK
> c                   bigint
> ```
>
> This is my tree.
>
> ```
> .
> ├── deps
> │   ├── flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar
> ├── hive.py
> ```
>
> This is the `hive.py`.
>
> ```python
> import os
> from pyflink.table import *
> from pyflink.table.catalog import HiveCatalog
>
> settings =
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> t_env = BatchTableEnvironment.create(environment_settings=settings)
>
> t_env.get_config().get_configuration().set_string(
>     "pipeline.jars",
> f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
> )
>
> catalog_name = "myhive"
> default_database = "aiinfra"
> hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf"
>
> hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
> t_env.register_catalog("myhive", hive_catalog)
>
> # set the HiveCatalog as the current catalog of the session
> t_env.use_catalog("myhive")
>
> TRANSFORM_DML = """
> INSERT INTO mysink
> SELECT a + b
> FROM mysource
> """
>
> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
> t_env.execute_sql(TRANSFORM_DML).wait()
> ```
>
> The above program works fine, until I turn the catalog registration
> logic into SQL.
>
> ```python
> import os
> from pyflink.table import *
> from pyflink.table.catalog import HiveCatalog
>
> settings =
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> t_env = BatchTableEnvironment.create(environment_settings=settings)
>
> t_env.get_config().get_configuration().set_string(
>     "pipeline.jars",
> f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
> )
>
> CREATE_CATALOG_DDL = """
> CREATE CATALOG myhive WITH (
>     'type' = 'hive',
>     'default-database' = 'aiinfra',
>     'hive-conf-dir' = '/data/apache/hive/apache-hive-2.1.0-bin/conf'
> )
> """
>
> USE_CATALOG_DDL = """
> USE CATALOG myhive
> """
>
> TRANSFORM_DML = """
> INSERT INTO mysink
> SELECT a + b
> FROM mysource
> """
>
> t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
> t_env.execute_sql(CREATE_CATALOG_DDL)
> t_env.execute_sql(USE_CATALOG_DDL)
>
> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
> t_env.execute_sql(TRANSFORM_DML).wait()
> ```
>
> Running the latter version `python hive.py` throws exception:
>
> ```
> Traceback (most recent call last):
>   File
> "/data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py",
> line 42, in <module>
>     t_env.execute_sql(SOURCE_DDL_1)
>   File
> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 766, in execute_sql
>   File
> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o10.executeSql.
> : org.apache.flink.table.api.NoMatchingTableFactoryException: Could
> not find a suitable table factory for
> 'org.apache.flink.table.factories.CatalogFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The following properties are requested:
> default-database=aiinfra
> hive-conf-dir=/data/apache/hive/apache-hive-2.1.0-bin/conf
> type=hive
>
> The following factories have been considered:
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1078)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:991)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> ```
>
> However, it is clear that HiveCatalogFactory is in the classpath.
>
> ```
> $ jar tf deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar  | grep
> CatalogFactory
> org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.class
> ```
>
> I wonder how to fix it? Thanks.
>
> Best,
> Yik San