Flink 1.7 doesn't work with Kafka Table Source Descriptors

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.7 doesn't work with Kafka Table Source Descriptors

dhanuka.priyanath
Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.

FlinkTableSourceLatest.java (6K) Download Attachment
flink-libs.png (172K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors

dhanuka.priyanath
Add Dev Group

On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors

Hequn Cheng
Hi dhanuka,

I failed to reproduce your error with release-1.7.0. It seems Kafka.toConnectorProperties() should be called instead of ConnectorDescriptor.toConnectorProperties(), the latter one is an abstract class, which lead to the AbstractMethodError.

From the picture uploaded, it is strange that the jar of 1.6.1 is mixed with the jar of 1.7.0. It may result in class conflict problem. 
Furthermore, set flink dependency scope to provided, so that classes of flink will not be packaged into the user jar. It will also cause class conflict problem.

Best, 
Hequn


On Fri, Dec 21, 2018 at 6:24 PM dhanuka ranasinghe <[hidden email]> wrote:
Add Dev Group

On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors

dhanuka.priyanath
Hi Cheng,

Thanks for your reply will try out and update you on this.

Cheers,
Dhanuka


On Sat, 22 Dec 2018, 20:41 Hequn Cheng <[hidden email] wrote:
Hi dhanuka,

I failed to reproduce your error with release-1.7.0. It seems Kafka.toConnectorProperties() should be called instead of ConnectorDescriptor.toConnectorProperties(), the latter one is an abstract class, which lead to the AbstractMethodError.

From the picture uploaded, it is strange that the jar of 1.6.1 is mixed with the jar of 1.7.0. It may result in class conflict problem. 
Furthermore, set flink dependency scope to provided, so that classes of flink will not be packaged into the user jar. It will also cause class conflict problem.

Best, 
Hequn


On Fri, Dec 21, 2018 at 6:24 PM dhanuka ranasinghe <[hidden email]> wrote:
Add Dev Group

On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors

dhanuka.priyanath
In reply to this post by Hequn Cheng
Hi Cheng,

I have removed 1.6.1 jars and then I got below error

Starting execution of program

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=group.id
connector.properties.1.value=analytics
connector.properties.2.key=bootstrap.servers
connector.properties.2.value=localhost:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=testin
connector.type=kafka
connector.version=universal
format.fail-on-missing-field=false
format.json-schema={\n  \"type\": \"object\",\n  \"properties\": {\n    \"food\": {\n      \"type\": \"string\"\n    },\n    \"price\": {\n      \"type\": \"integer\"\n    },\n    \"processingTime\": {\n      \"type\": \"integer\"\n    }\n  }\n}
format.property-version=1
format.type=json
schema.0.type=VARCHAR
schema.1.type=DECIMAL
schema.2.name=processingTime
schema.2.proctime=true
schema.2.type=TIMESTAMP
update-mode=append

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:97)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)


On Sat, Dec 22, 2018 at 8:41 PM Hequn Cheng <[hidden email]> wrote:
Hi dhanuka,

I failed to reproduce your error with release-1.7.0. It seems Kafka.toConnectorProperties() should be called instead of ConnectorDescriptor.toConnectorProperties(), the latter one is an abstract class, which lead to the AbstractMethodError.

From the picture uploaded, it is strange that the jar of 1.6.1 is mixed with the jar of 1.7.0. It may result in class conflict problem. 
Furthermore, set flink dependency scope to provided, so that classes of flink will not be packaged into the user jar. It will also cause class conflict problem.

Best, 
Hequn


On Fri, Dec 21, 2018 at 6:24 PM dhanuka ranasinghe <[hidden email]> wrote:
Add Dev Group

On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors

Hequn Cheng
Hi Dhanuka,

From the exceptions, it seems you have changed the Kafka version to 'universal'. You can solve your problem in any of the following ways:
- Change Kafka version to 0.11. You only have a jar of 0.11 version in your lib folder.
- Add flink-connector-kafka_2.11-1.7.0.jar to your lib folder if you want to use 'universal'.

Best, Hequn

On Sun, Dec 23, 2018 at 8:48 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi Cheng,

I have removed 1.6.1 jars and then I got below error

Starting execution of program

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=group.id
connector.properties.1.value=analytics
connector.properties.2.key=bootstrap.servers
connector.properties.2.value=localhost:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=testin
connector.type=kafka
connector.version=universal
format.fail-on-missing-field=false
format.json-schema={\n  \"type\": \"object\",\n  \"properties\": {\n    \"food\": {\n      \"type\": \"string\"\n    },\n    \"price\": {\n      \"type\": \"integer\"\n    },\n    \"processingTime\": {\n      \"type\": \"integer\"\n    }\n  }\n}
format.property-version=1
format.type=json
schema.0.type=VARCHAR
schema.1.type=DECIMAL
schema.2.name=processingTime
schema.2.proctime=true
schema.2.type=TIMESTAMP
update-mode=append

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:97)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)


On Sat, Dec 22, 2018 at 8:41 PM Hequn Cheng <[hidden email]> wrote:
Hi dhanuka,

I failed to reproduce your error with release-1.7.0. It seems Kafka.toConnectorProperties() should be called instead of ConnectorDescriptor.toConnectorProperties(), the latter one is an abstract class, which lead to the AbstractMethodError.

From the picture uploaded, it is strange that the jar of 1.6.1 is mixed with the jar of 1.7.0. It may result in class conflict problem. 
Furthermore, set flink dependency scope to provided, so that classes of flink will not be packaged into the user jar. It will also cause class conflict problem.

Best, 
Hequn


On Fri, Dec 21, 2018 at 6:24 PM dhanuka ranasinghe <[hidden email]> wrote:
Add Dev Group

On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors

dhanuka.priyanath
Hi Cheng,

Thanks for your prompt reply. I was able to figured out the problem. What I got wrong was not properly configuring org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory as as TableSourceFactory in META-INF/services.

Cheers,
Dhanuka

On Mon, Dec 24, 2018 at 10:15 AM Hequn Cheng <[hidden email]> wrote:
Hi Dhanuka,

From the exceptions, it seems you have changed the Kafka version to 'universal'. You can solve your problem in any of the following ways:
- Change Kafka version to 0.11. You only have a jar of 0.11 version in your lib folder.
- Add flink-connector-kafka_2.11-1.7.0.jar to your lib folder if you want to use 'universal'.

Best, Hequn

On Sun, Dec 23, 2018 at 8:48 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi Cheng,

I have removed 1.6.1 jars and then I got below error

Starting execution of program

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=group.id
connector.properties.1.value=analytics
connector.properties.2.key=bootstrap.servers
connector.properties.2.value=localhost:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=testin
connector.type=kafka
connector.version=universal
format.fail-on-missing-field=false
format.json-schema={\n  \"type\": \"object\",\n  \"properties\": {\n    \"food\": {\n      \"type\": \"string\"\n    },\n    \"price\": {\n      \"type\": \"integer\"\n    },\n    \"processingTime\": {\n      \"type\": \"integer\"\n    }\n  }\n}
format.property-version=1
format.type=json
schema.0.type=VARCHAR
schema.1.type=DECIMAL
schema.2.name=processingTime
schema.2.proctime=true
schema.2.type=TIMESTAMP
update-mode=append

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:97)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)


On Sat, Dec 22, 2018 at 8:41 PM Hequn Cheng <[hidden email]> wrote:
Hi dhanuka,

I failed to reproduce your error with release-1.7.0. It seems Kafka.toConnectorProperties() should be called instead of ConnectorDescriptor.toConnectorProperties(), the latter one is an abstract class, which lead to the AbstractMethodError.

From the picture uploaded, it is strange that the jar of 1.6.1 is mixed with the jar of 1.7.0. It may result in class conflict problem. 
Furthermore, set flink dependency scope to provided, so that classes of flink will not be packaged into the user jar. It will also cause class conflict problem.

Best, 
Hequn


On Fri, Dec 21, 2018 at 6:24 PM dhanuka ranasinghe <[hidden email]> wrote:
Add Dev Group

On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.