Hi,
I could not launch my flink 1.11.2 application on EMR with exception Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. I attached the full log at the end. After checking some other threads and none applies in my case. here is my observation: 1. dependency check: both flink-connector-kafka and flink-json are included in the final fat jar. 2. resources/META-INF/services/org.apache.flink.table.factories.TableFactory has the following and is included in the final fat jar. - org.apache.flink.formats.json.JsonRowFormatFactory - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory also noticed that only identifier datagen is shown in the log. No kafka or json in there. 3. local IntelliJ running fine. 4. same jar on EMR not working Please advise. Thanks, Fanbin Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.analytics_service'. Table options are: 'connector'='kafka' 'format'='json' 'json.ignore-parse-errors'='true' 'properties.bootstrap.servers'='localhost:9093' 'properties.group.id'='xxx' 'properties.security.protocol'='SSL' 'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1' 'properties.ssl.key.password'='secret' 'properties.ssl.keystore.location'='xxx.jks' 'properties.ssl.keystore.password'='secret' 'properties.ssl.keystore.type'='JKS' 'properties.ssl.truststore.location'='xxx.jks' 'properties.ssl.truststore.password'='secret' 'properties.ssl.truststore.type'='JKS' 'properties.zookeeper.connect'='localhost:2181' 'scan.startup.mode'='earliest-offset' 'topic'='events' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133) at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36) at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30) at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76) at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 11 more Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ... 43 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ... 44 more |
Hi, Please verify that: 1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf your-program.jar | grep KafkaDynamicTableFactory") 2. kafka-connector version matches the version of Flink distribution on EMR. Regards,
Roman On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu <[hidden email]> wrote:
|
all those are verified. the issue is fixed by adding org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory to org.apache.flink.table.factories.Factory. Thanks, Fanbin On Tue, Nov 17, 2020 at 7:29 AM Khachatryan Roman <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |