Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
I attached the full log at the end. After checking some other threads and none applies in my case. here is my observation:
1. dependency check: both flink-connector-kafka and flink-json are included in the final fat jar.
2. resources/META-INF/services/org.apache.flink.table.factories.TableFactory has the following and is included in the final fat jar.
- org.apache.flink.formats.json.JsonRowFormatFactory
also noticed that only identifier datagen is shown in the log. No kafka or json in there.
3. local IntelliJ running fine.
4. same jar on EMR not working
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.analytics_service'.
Table options are:
'connector'='kafka'
'format'='json'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='localhost:9093'
'
properties.group.id'='xxx'
'properties.security.protocol'='SSL'
'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
'properties.ssl.key.password'='secret'
'properties.ssl.keystore.location'='xxx.jks'
'properties.ssl.keystore.password'='secret'
'properties.ssl.keystore.type'='JKS'
'properties.ssl.truststore.location'='xxx.jks'
'properties.ssl.truststore.password'='secret'
'properties.ssl.truststore.type'='JKS'
'properties.zookeeper.connect'='localhost:2181'
'scan.startup.mode'='earliest-offset'
'topic'='events'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''.
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 44 more