This question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca.
I am running a PyFlink program that reads from Hive `mysource` table, does some processing, then writes to Hive `mysink` table. ``` hive (aiinfra)> describe mysource; OK a bigint b bigint hive (aiinfra)> describe mysink; OK c bigint ``` This is my tree. ``` . ├── deps │ ├── flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar ├── hive.py ``` This is the `hive.py`. ```python import os from pyflink.table import * from pyflink.table.catalog import HiveCatalog settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() t_env = BatchTableEnvironment.create(environment_settings=settings) t_env.get_config().get_configuration().set_string( "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar" ) catalog_name = "myhive" default_database = "aiinfra" hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf" hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir) t_env.register_catalog("myhive", hive_catalog) # set the HiveCatalog as the current catalog of the session t_env.use_catalog("myhive") TRANSFORM_DML = """ INSERT INTO mysink SELECT a + b FROM mysource """ t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) t_env.execute_sql(TRANSFORM_DML).wait() ``` The above program works fine, until I turn the catalog registration logic into SQL. ```python import os from pyflink.table import * from pyflink.table.catalog import HiveCatalog settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() t_env = BatchTableEnvironment.create(environment_settings=settings) t_env.get_config().get_configuration().set_string( "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar" ) CREATE_CATALOG_DDL = """ CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'aiinfra', 'hive-conf-dir' = '/data/apache/hive/apache-hive-2.1.0-bin/conf' ) """ USE_CATALOG_DDL = """ USE CATALOG myhive """ TRANSFORM_DML = """ INSERT INTO mysink SELECT a + b FROM mysource """ t_env.get_config().set_sql_dialect(SqlDialect.HIVE) t_env.execute_sql(CREATE_CATALOG_DDL) t_env.execute_sql(USE_CATALOG_DDL) t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) t_env.execute_sql(TRANSFORM_DML).wait() ``` Running the latter version `python hive.py` throws exception: ``` Traceback (most recent call last): File "/data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py", line 42, in <module> t_env.execute_sql(SOURCE_DDL_1) File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 766, in execute_sql File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql. : org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: default-database=aiinfra hive-conf-dir=/data/apache/hive/apache-hive-2.1.0-bin/conf type=hive The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1078) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:991) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` However, it is clear that HiveCatalogFactory is in the classpath. ``` $ jar tf deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar | grep CatalogFactory org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.class ``` I wonder how to fix it? Thanks. Best, Yik San |
Please try copying the connector jar into the lib/ or opt/ directory.
On 3/26/2021 11:59 AM, Yik San Chan wrote: > This question is cross-posted on Stack Overflow > https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca > <https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca>. > > > I am running a PyFlink program that reads from Hive `mysource` table, > does some processing, then writes to Hive `mysink` table. > > ``` > hive (aiinfra)> describe mysource; > OK > a bigint > b bigint > > hive (aiinfra)> describe mysink; > OK > c bigint > ``` > > This is my tree. > > ``` > . > ├── deps > │ ├── flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar > ├── hive.py > ``` > > This is the `hive.py`. > > ```python > import os > from pyflink.table import * > from pyflink.table.catalog import HiveCatalog > > settings = > EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() > t_env = BatchTableEnvironment.create(environment_settings=settings) > > t_env.get_config().get_configuration().set_string( > "pipeline.jars", > f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar" > ) > > catalog_name = "myhive" > default_database = "aiinfra" > hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf" > > hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir) > t_env.register_catalog("myhive", hive_catalog) > > # set the HiveCatalog as the current catalog of the session > t_env.use_catalog("myhive") > > TRANSFORM_DML = """ > INSERT INTO mysink > SELECT a + b > FROM mysource > """ > > t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) > t_env.execute_sql(TRANSFORM_DML).wait() > ``` > > The above program works fine, until I turn the catalog registration > logic into SQL. > > ```python > import os > from pyflink.table import * > from pyflink.table.catalog import HiveCatalog > > settings = > EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() > t_env = BatchTableEnvironment.create(environment_settings=settings) > > t_env.get_config().get_configuration().set_string( > "pipeline.jars", > f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar" > ) > > CREATE_CATALOG_DDL = """ > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'aiinfra', > 'hive-conf-dir' = '/data/apache/hive/apache-hive-2.1.0-bin/conf' > ) > """ > > USE_CATALOG_DDL = """ > USE CATALOG myhive > """ > > TRANSFORM_DML = """ > INSERT INTO mysink > SELECT a + b > FROM mysource > """ > > t_env.get_config().set_sql_dialect(SqlDialect.HIVE) > t_env.execute_sql(CREATE_CATALOG_DDL) > t_env.execute_sql(USE_CATALOG_DDL) > > t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) > t_env.execute_sql(TRANSFORM_DML).wait() > ``` > > Running the latter version `python hive.py` throws exception: > > ``` > Traceback (most recent call last): > File > "/data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py", > line 42, in <module> > t_env.execute_sql(SOURCE_DDL_1) > File > "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 766, in execute_sql > File > "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o10.executeSql. > : org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.CatalogFactory' in > the classpath. > > Reason: Required context properties mismatch. > > The following properties are requested: > default-database=aiinfra > hive-conf-dir=/data/apache/hive/apache-hive-2.1.0-bin/conf > type=hive > > The following factories have been considered: > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > at > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1078) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:991) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > ``` > > However, it is clear that HiveCatalogFactory is in the classpath. > > ``` > $ jar tf deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar | grep > CatalogFactory > org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.class > ``` > > I wonder how to fix it? Thanks. > > Best, > Yik San |
Free forum by Nabble | Edit this page |