Required context properties mismatch in connecting the flink with mysql database

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Required context properties mismatch in connecting the flink with mysql database

xi sizhe

I am using flink latest (1.11.2) to work with a sample mysql database, which the database is working fine.

Additionally, i have added the flink-connector-jdbc_2.11-1.11.2, mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib

Here is my code

T_CONFIG = TableConfig()
B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
B_EXEC_ENV.set_parallelism(1)
BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG)

ddl = """
            CREATE TABLE nba_player4 (
                 first_name STRING ,
                 last_name STRING,
                 email STRING,
                 id INT
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/inventory',
                'username' = 'root',
                'password' = 'debezium',
                'table-name' = 'customers'
            )
      """;
BT_ENV.sql_update(ddl)

sinkddl = """
        CREATE TABLE print_table (
         f0 INT,
         f1 INT,
         f2 STRING,
         f3 DOUBLE
        ) WITH (
         'connector' = 'print'
        )
      """;
BT_ENV.sql_update(sinkddl)


sqlquery("SELECT first_name, last_name  FROM nba_player4 ");
BT_ENV.execute("table_job")

However when running the code, it come up with error saying

py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=jdbc
password=debezium
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=first_name
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=last_name
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=email
schema.3.data-type=INT
schema.3.name=id
table-name=customers
url=jdbc:mysql://localhost:3306/inventory
username=root

The following factories have been considered:
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory

latest:

this is my docker yml file.

version: '2.1'
services:
  jobmanager:
    build: .
    image: flink:latest
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: flink:latest
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  mysql:
    image: debezium/example-mysql
    ports:
     - "3306:3306"
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw 

docker ps commands show out

CONTAINER ID        IMAGE                       COMMAND                  CREATED             STATUS              PORTS                                                            NAMES
cf84c84f7821        flink      "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        6121-6123/tcp, 8081/tcp                                          _taskmanager_1
09b19142d70a        flink      "/docker-entrypoint.…"   9 minutes ago       Up 9 minutes        6123/tcp, 0.0.0.0:8081->8081/tcp                                 _jobmanager_1
4ac01eb11bf7        debezium/example-mysql      "docker-entrypoint.s…"   3 days ago          Up 9 minutes        0.0.0.0:3306->3306/tcp, 33060/tcp                                keras-flask-dep

more info:

my current flink environment in docker is flink:scala_2.12-java8

docker pull flink:scala_2.12-java8

pyflink jdbc connector is flink-connector-jdbc_2.11-1.11.2.jar from flink 1.11 version.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

in order to use the jdbc library, I tried two ways

  1. save the flink-connector-jdbc_2.11-1.11.2.jar into /usr/local/lib/python3.7/site-packages/pyflink/lib

  2. configure the classpath in the python app

     base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
     flink_jdbc_jar = f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
    
    BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)
    

but still getting the same error

Reply | Threaded
Open this post in threaded view
|

Re: Required context properties mismatch in connecting the flink with mysql database

Dawid Wysakowicz-2

Hi,

I think the problem is that you are using BatchTableEnvironment which is deprecated and does not support newer features such as e.g. FLIP-95 sources/sinks. I am sorry it is not more prominent in the documentation.

I am not too familiar with the python API, and I am not sure if a unified TableEnvironment is available there. In Java/Scala I'd recommend using the unified TableEnvironment. If it is not available in python API, you can use the StreamTableEnvironment, which actually extends the unified one.

Moreover, please, please make sure you are using the same component versions or otherwise you might face hard to track problems. You are mixing components for different scala versions. (Your cluster uses scala 2.12, but you are adding scala 2.11 additional dependencies).

Best,

Dawid

On 14/10/2020 03:13, xi sizhe wrote:

I am using flink latest (1.11.2) to work with a sample mysql database, which the database is working fine.

Additionally, i have added the flink-connector-jdbc_2.11-1.11.2, mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib

Here is my code

T_CONFIG = TableConfig()
B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
B_EXEC_ENV.set_parallelism(1)
BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG)

ddl = """
            CREATE TABLE nba_player4 (
                 first_name STRING ,
                 last_name STRING,
                 email STRING,
                 id INT
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/inventory',
                'username' = 'root',
                'password' = 'debezium',
                'table-name' = 'customers'
            )
      """;
BT_ENV.sql_update(ddl)

sinkddl = """
        CREATE TABLE print_table (
         f0 INT,
         f1 INT,
         f2 STRING,
         f3 DOUBLE
        ) WITH (
         'connector' = 'print'
        )
      """;
BT_ENV.sql_update(sinkddl)


sqlquery("SELECT first_name, last_name  FROM nba_player4 ");
BT_ENV.execute("table_job")

However when running the code, it come up with error saying

py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=jdbc
password=debezium
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=first_name
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=last_name
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=email
schema.3.data-type=INT
schema.3.name=id
table-name=customers
url=jdbc:mysql://localhost:3306/inventory
username=root

The following factories have been considered:
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory

latest:

this is my docker yml file.

version: '2.1'
services:
  jobmanager:
    build: .
    image: flink:latest
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: flink:latest
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  mysql:
    image: debezium/example-mysql
    ports:
     - "3306:3306"
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw 

docker ps commands show out

CONTAINER ID        IMAGE                       COMMAND                  CREATED             STATUS              PORTS                                                            NAMES
cf84c84f7821        flink      "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        6121-6123/tcp, 8081/tcp                                          _taskmanager_1
09b19142d70a        flink      "/docker-entrypoint.…"   9 minutes ago       Up 9 minutes        6123/tcp, 0.0.0.0:8081->8081/tcp                                 _jobmanager_1
4ac01eb11bf7        debezium/example-mysql      "docker-entrypoint.s…"   3 days ago          Up 9 minutes        0.0.0.0:3306->3306/tcp, 33060/tcp                                keras-flask-dep

more info:

my current flink environment in docker is flink:scala_2.12-java8

docker pull flink:scala_2.12-java8

pyflink jdbc connector is flink-connector-jdbc_2.11-1.11.2.jar from flink 1.11 version.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

in order to use the jdbc library, I tried two ways

  1. save the flink-connector-jdbc_2.11-1.11.2.jar into /usr/local/lib/python3.7/site-packages/pyflink/lib

  2. configure the classpath in the python app

     base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
     flink_jdbc_jar = f<a class="moz-txt-link-rfc2396E" href="file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar">"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
    
    BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)
    

but still getting the same error


signature.asc (849 bytes) Download Attachment