Hello! I have a Scala 2.12 project which registers some tables (that get
their data from Kafka in JSON form) to the StreamTableEnvironment via the executeSql command before calling execute on the StreamExecutionEnvironment. Everything behaves as expected until I either try to set /'format.ignore-parse-errors' = 'true'/ in the connector options, or I try to add the Kafka record timestamp as a table field via /`ts` TIMESTAMP(3) METADATA FROM 'timestamp'/. In both of these case I get: *Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.* *Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory supports all properties. * Additionally, for ignoring parsing errors: *The matching candidates: org.apache.flink.formats.json.JsonRowFormatFactory Unsupported property keys: format.ignore-parse-errors* While, for the timestamp field: *The matching candidates: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory Unsupported property keys: schema.#.metadata schema.#.virtual* Here is the DDL code used for table creation: / "CREATE TEMPORARY TABLE `" + name + "` (" + tableFields + ") " + "WITH (" + "'connector.type' = 'kafka', " + "'connector.version' = 'universal', " + "'connector.topic' = '" + name + "', " + "'connector.properties.bootstrap.servers' = '" + kafkaAddr + "', " + "'connector.startup-mode' = '" + (if (checkLatest) "latest-offset" else "earliest-offset") + "', " + "'connector.properties.default.api.timeout.ms' = '5000', " + "'format.type' = 'json', " + "'format.fail-on-missing-field' = 'false'" + ")" / And here is the Flink-related config from build.sbt: / lazy val flinkVersion = "1.12.0" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % flinkVersion, "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, "org.apache.flink" %% "flink-clients" % flinkVersion, "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion, "org.apache.flink" % "flink-json" % flinkVersion, "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, "org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests", "org.apache.flink" %% "flink-streaming-java" % flinkVersion % Test classifier "tests", ) / I would appreciate any tips on getting both the timestamp and the error parse setting to work. Thank you in advance! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, abelm ~ Which version Flink did you use? We did some refactoring for the connector options since Flink 1.11. The METADATA syntax is only supported since version 1.12. In 1.11, to ignore the parse errors, you need to use option "json.ignore-parse-error" [1] abelm <[hidden email]> 于2020年12月10日周四 上午1:13写道: Hello! I have a Scala 2.12 project which registers some tables (that get |
Hi! Thank you for the reply!
I understand that the metadata syntax is only available as of 1.12, but I am indeed trying to use Flink 1.12. Regarding the option for ignoring parse errors: I have already noticed from before that, according to the docs, even in Flink 1.11 (which is the version that the project was running on before), the option should be 'json.ignore-parse-errors', but for some strange reason, to set fail on missing field to false, I also only seemed to get it to work with 'format.fail-on-missing-field', instead of the 'json.fail-on-missing-field' option stated in the docs. For that reason, I feel like I might be doing something wrong in terms of dependencies, given that so many of my connector/format options seem to require old syntax instead of the newer one from Flink 1.11 and 1.12 to run. As a further example, you might notice in my original message that I'm also using 'connector.type' = 'kafka' instead of 'connector' = 'kafka', because for some reason that's the only version that worked. Genuinely not sure how I managed to create such a problem, so any further help would be appreciated. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
One thing needs to note is that the old connectors are still in the release-1.11/release-1.12 jars. So the old option still works but with the old connector codes. You may need to find the root cause why the new options do not work, maybe some stack trace here ? abelm <[hidden email]> 于2020年12月10日周四 下午10:54写道: Hi! Thank you for the reply! |
Hi again!
I did not realise both the new and old codes are supported in the later versions of the connector, but that was indeed the problem. Updated all of the options to use the new syntax and everything works like a charm. I should've definitely been a bit more careful about that 😅 Thanks a lot for the help! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |