Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

abelm
Hello! I have a Scala 2.12 project which registers some tables (that get
their data from Kafka in JSON form) to the StreamTableEnvironment via the
executeSql command before calling execute on the StreamExecutionEnvironment.

Everything behaves as expected until I either try to set
/'format.ignore-parse-errors' = 'true'/ in the connector options, or I try
to add the Kafka record timestamp as a table field via /`ts` TIMESTAMP(3)
METADATA FROM 'timestamp'/. In both of these case I get:

*Exception in thread "main" org.apache.flink.table.api.TableException:
findAndCreateTableSource failed.*
*Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory supports all properties.
*

Additionally, for ignoring parsing errors:
*The matching candidates:
org.apache.flink.formats.json.JsonRowFormatFactory
Unsupported property keys:
format.ignore-parse-errors*

While, for the timestamp field:
*The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
schema.#.metadata
schema.#.virtual*

Here is the DDL code used for table creation:
/
    "CREATE TEMPORARY TABLE `" + name + "` (" + tableFields + ") " +
      "WITH (" +
      "'connector.type' = 'kafka', " +
      "'connector.version' = 'universal', " +
      "'connector.topic' = '" + name + "', " +
      "'connector.properties.bootstrap.servers' = '" + kafkaAddr + "', " +
      "'connector.startup-mode' = '" +
      (if (checkLatest) "latest-offset" else "earliest-offset") +
      "', " +
      "'connector.properties.default.api.timeout.ms' = '5000', " +
      "'format.type' = 'json', " +
      "'format.fail-on-missing-field' = 'false'" +
      ")"
/

And here is the Flink-related config from build.sbt:
/
lazy val flinkVersion       = "1.12.0"
libraryDependencies ++= Seq(
  "org.apache.flink"          %% "flink-scala"                    %
flinkVersion,
  "org.apache.flink"          %% "flink-streaming-scala"          %
flinkVersion,
  "org.apache.flink"          %% "flink-connector-kafka"          %
flinkVersion,
  "org.apache.flink"          %% "flink-clients"                  %
flinkVersion,
  "org.apache.flink"          %% "flink-table-api-scala-bridge"   %
flinkVersion,
  "org.apache.flink"          %% "flink-table-planner-blink"      %
flinkVersion,
  "org.apache.flink"           % "flink-json"                     %
flinkVersion,
  "org.apache.flink"          %% "flink-test-utils"               %
flinkVersion      % Test,
  "org.apache.flink"          %% "flink-runtime"                  %
flinkVersion      % Test classifier "tests",
  "org.apache.flink"          %% "flink-streaming-java"           %
flinkVersion      % Test classifier "tests",
)
/

I would appreciate any tips on getting both the timestamp and the error
parse setting to work. Thank you in advance!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

Danny Chan-2
Hi, abelm ~

Which version Flink did you use? We did some refactoring for the connector options since Flink 1.11. The METADATA syntax is only supported since version 1.12.

In 1.11, to ignore the parse errors, you need to use option "json.ignore-parse-error" [1]


abelm <[hidden email]> 于2020年12月10日周四 上午1:13写道:
Hello! I have a Scala 2.12 project which registers some tables (that get
their data from Kafka in JSON form) to the StreamTableEnvironment via the
executeSql command before calling execute on the StreamExecutionEnvironment.

Everything behaves as expected until I either try to set
/'format.ignore-parse-errors' = 'true'/ in the connector options, or I try
to add the Kafka record timestamp as a table field via /`ts` TIMESTAMP(3)
METADATA FROM 'timestamp'/. In both of these case I get:

*Exception in thread "main" org.apache.flink.table.api.TableException:
findAndCreateTableSource failed.*
*Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory supports all properties.
*

Additionally, for ignoring parsing errors:
*The matching candidates:
org.apache.flink.formats.json.JsonRowFormatFactory
Unsupported property keys:
format.ignore-parse-errors*

While, for the timestamp field:
*The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
schema.#.metadata
schema.#.virtual*

Here is the DDL code used for table creation:
/
    "CREATE TEMPORARY TABLE `" + name + "` (" + tableFields + ") " +
      "WITH (" +
      "'connector.type' = 'kafka', " +
      "'connector.version' = 'universal', " +
      "'connector.topic' = '" + name + "', " +
      "'connector.properties.bootstrap.servers' = '" + kafkaAddr + "', " +
      "'connector.startup-mode' = '" +
      (if (checkLatest) "latest-offset" else "earliest-offset") +
      "', " +
      "'connector.properties.default.api.timeout.ms' = '5000', " +
      "'format.type' = 'json', " +
      "'format.fail-on-missing-field' = 'false'" +
      ")"
/

And here is the Flink-related config from build.sbt:
/
lazy val flinkVersion       = "1.12.0"
libraryDependencies ++= Seq(
  "org.apache.flink"          %% "flink-scala"                    %
flinkVersion,
  "org.apache.flink"          %% "flink-streaming-scala"          %
flinkVersion,
  "org.apache.flink"          %% "flink-connector-kafka"          %
flinkVersion,
  "org.apache.flink"          %% "flink-clients"                  %
flinkVersion,
  "org.apache.flink"          %% "flink-table-api-scala-bridge"   %
flinkVersion,
  "org.apache.flink"          %% "flink-table-planner-blink"      %
flinkVersion,
  "org.apache.flink"           % "flink-json"                     %
flinkVersion,
  "org.apache.flink"          %% "flink-test-utils"               %
flinkVersion      % Test,
  "org.apache.flink"          %% "flink-runtime"                  %
flinkVersion      % Test classifier "tests",
  "org.apache.flink"          %% "flink-streaming-java"           %
flinkVersion      % Test classifier "tests",
)
/

I would appreciate any tips on getting both the timestamp and the error
parse setting to work. Thank you in advance!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

abelm
Hi! Thank you for the reply!

I understand that the metadata syntax is only available as of 1.12, but I am
indeed trying to use Flink 1.12.

Regarding the option for ignoring parse errors: I have already noticed from
before that, according to the docs, even in Flink 1.11 (which is the version
that the project was running on before),  the option should be
'json.ignore-parse-errors', but for some strange reason, to set fail on
missing field to false, I also only seemed to get it to work with
'format.fail-on-missing-field', instead of the 'json.fail-on-missing-field'
option stated in the docs.

For that reason, I feel like I might be doing something wrong in terms of
dependencies, given that so many of my connector/format options seem to
require old syntax instead of the newer one from Flink 1.11 and 1.12 to run.
As a further example, you might notice in my original message that I'm also
using 'connector.type' = 'kafka' instead of 'connector' = 'kafka', because
for some reason that's the only version that worked.

Genuinely not sure how I managed to create such a problem, so any further
help would be appreciated.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

Danny Chan-2
One thing needs to note is that the old connectors are still in the release-1.11/release-1.12 jars. So the old option still works but with the old connector codes.

You may need to find the root cause why the new options do not work, maybe some stack trace here ?

abelm <[hidden email]> 于2020年12月10日周四 下午10:54写道:
Hi! Thank you for the reply!

I understand that the metadata syntax is only available as of 1.12, but I am
indeed trying to use Flink 1.12.

Regarding the option for ignoring parse errors: I have already noticed from
before that, according to the docs, even in Flink 1.11 (which is the version
that the project was running on before),  the option should be
'json.ignore-parse-errors', but for some strange reason, to set fail on
missing field to false, I also only seemed to get it to work with
'format.fail-on-missing-field', instead of the 'json.fail-on-missing-field'
option stated in the docs.

For that reason, I feel like I might be doing something wrong in terms of
dependencies, given that so many of my connector/format options seem to
require old syntax instead of the newer one from Flink 1.11 and 1.12 to run.
As a further example, you might notice in my original message that I'm also
using 'connector.type' = 'kafka' instead of 'connector' = 'kafka', because
for some reason that's the only version that worked.

Genuinely not sure how I managed to create such a problem, so any further
help would be appreciated.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

abelm
Hi again!

I did not realise both the new and old codes are supported in the later
versions of the connector, but that was indeed the problem. Updated all of
the options to use the new syntax and everything works like a charm.

I should've definitely been a bit more careful about that 😅

Thanks a lot for the help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/