Re: Flink 1.11.2 could not create kafka table source on EMR.

Posted by r_khachatryan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-1-11-2-could-not-create-kafka-table-source-on-EMR-tp39512p39541.html

Hi,

Please verify that:
1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf your-program.jar | grep KafkaDynamicTableFactory")
2. kafka-connector version matches the version of Flink distribution on EMR.

Regards,
Roman


On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu <[hidden email]> wrote:
Hi,

I could not launch my flink 1.11.2 application on EMR with exception 

Caused by: org.apache.flink.table.api.ValidationException: 
Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

I attached the full log at the end. After checking some other threads and none applies in my case. here is my observation:

1. dependency check: both flink-connector-kafka and flink-json are included in the final fat jar.
2. resources/META-INF/services/org.apache.flink.table.factories.TableFactory has the following and is included in the final fat jar.
  - org.apache.flink.formats.json.JsonRowFormatFactory
  - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  also noticed that only identifier datagen is shown in the log. No kafka or json in there. 
3. local IntelliJ running fine.
4. same jar on EMR not working

Please advise. 
Thanks,
Fanbin




Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.analytics_service'.

Table options are:

'connector'='kafka'
'format'='json'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='localhost:9093'
'properties.group.id'='xxx'
'properties.security.protocol'='SSL'
'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
'properties.ssl.key.password'='secret'
'properties.ssl.keystore.location'='xxx.jks'
'properties.ssl.keystore.password'='secret'
'properties.ssl.keystore.type'='JKS'
'properties.ssl.truststore.location'='xxx.jks'
'properties.ssl.truststore.password'='secret'
'properties.ssl.truststore.type'='JKS'
'properties.zookeeper.connect'='localhost:2181'
'scan.startup.mode'='earliest-offset'
'topic'='events'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''.
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 44 more