Flink 1.11.2 could not create kafka table source on EMR.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11.2 could not create kafka table source on EMR.

Fanbin Bu
Hi,

I could not launch my flink 1.11.2 application on EMR with exception 

Caused by: org.apache.flink.table.api.ValidationException: 
Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

I attached the full log at the end. After checking some other threads and none applies in my case. here is my observation:

1. dependency check: both flink-connector-kafka and flink-json are included in the final fat jar.
2. resources/META-INF/services/org.apache.flink.table.factories.TableFactory has the following and is included in the final fat jar.
  - org.apache.flink.formats.json.JsonRowFormatFactory
  - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  also noticed that only identifier datagen is shown in the log. No kafka or json in there. 
3. local IntelliJ running fine.
4. same jar on EMR not working

Please advise. 
Thanks,
Fanbin




Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.analytics_service'.

Table options are:

'connector'='kafka'
'format'='json'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='localhost:9093'
'properties.group.id'='xxx'
'properties.security.protocol'='SSL'
'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
'properties.ssl.key.password'='secret'
'properties.ssl.keystore.location'='xxx.jks'
'properties.ssl.keystore.password'='secret'
'properties.ssl.keystore.type'='JKS'
'properties.ssl.truststore.location'='xxx.jks'
'properties.ssl.truststore.password'='secret'
'properties.ssl.truststore.type'='JKS'
'properties.zookeeper.connect'='localhost:2181'
'scan.startup.mode'='earliest-offset'
'topic'='events'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''.
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 44 more



Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11.2 could not create kafka table source on EMR.

r_khachatryan
Hi,

Please verify that:
1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf your-program.jar | grep KafkaDynamicTableFactory")
2. kafka-connector version matches the version of Flink distribution on EMR.

Regards,
Roman


On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu <[hidden email]> wrote:
Hi,

I could not launch my flink 1.11.2 application on EMR with exception 

Caused by: org.apache.flink.table.api.ValidationException: 
Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

I attached the full log at the end. After checking some other threads and none applies in my case. here is my observation:

1. dependency check: both flink-connector-kafka and flink-json are included in the final fat jar.
2. resources/META-INF/services/org.apache.flink.table.factories.TableFactory has the following and is included in the final fat jar.
  - org.apache.flink.formats.json.JsonRowFormatFactory
  - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  also noticed that only identifier datagen is shown in the log. No kafka or json in there. 
3. local IntelliJ running fine.
4. same jar on EMR not working

Please advise. 
Thanks,
Fanbin




Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.analytics_service'.

Table options are:

'connector'='kafka'
'format'='json'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='localhost:9093'
'properties.group.id'='xxx'
'properties.security.protocol'='SSL'
'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
'properties.ssl.key.password'='secret'
'properties.ssl.keystore.location'='xxx.jks'
'properties.ssl.keystore.password'='secret'
'properties.ssl.keystore.type'='JKS'
'properties.ssl.truststore.location'='xxx.jks'
'properties.ssl.truststore.password'='secret'
'properties.ssl.truststore.type'='JKS'
'properties.zookeeper.connect'='localhost:2181'
'scan.startup.mode'='earliest-offset'
'topic'='events'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''.
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 44 more



Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11.2 could not create kafka table source on EMR.

Fanbin Bu
all those are verified.

the issue is fixed by adding 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory to org.apache.flink.table.factories.Factory.

Thanks,
Fanbin



On Tue, Nov 17, 2020 at 7:29 AM Khachatryan Roman <[hidden email]> wrote:
Hi,

Please verify that:
1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf your-program.jar | grep KafkaDynamicTableFactory")
2. kafka-connector version matches the version of Flink distribution on EMR.

Regards,
Roman


On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu <[hidden email]> wrote:
Hi,

I could not launch my flink 1.11.2 application on EMR with exception 

Caused by: org.apache.flink.table.api.ValidationException: 
Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

I attached the full log at the end. After checking some other threads and none applies in my case. here is my observation:

1. dependency check: both flink-connector-kafka and flink-json are included in the final fat jar.
2. resources/META-INF/services/org.apache.flink.table.factories.TableFactory has the following and is included in the final fat jar.
  - org.apache.flink.formats.json.JsonRowFormatFactory
  - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  also noticed that only identifier datagen is shown in the log. No kafka or json in there. 
3. local IntelliJ running fine.
4. same jar on EMR not working

Please advise. 
Thanks,
Fanbin




Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.analytics_service'.

Table options are:

'connector'='kafka'
'format'='json'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='localhost:9093'
'properties.group.id'='xxx'
'properties.security.protocol'='SSL'
'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
'properties.ssl.key.password'='secret'
'properties.ssl.keystore.location'='xxx.jks'
'properties.ssl.keystore.password'='secret'
'properties.ssl.keystore.type'='JKS'
'properties.ssl.truststore.location'='xxx.jks'
'properties.ssl.truststore.password'='secret'
'properties.ssl.truststore.type'='JKS'
'properties.zookeeper.connect'='localhost:2181'
'scan.startup.mode'='earliest-offset'
'topic'='events'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''.
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 44 more