Hi All, I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink. Could you please help me on this. bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics Starting execution of program java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map; at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58) at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107) at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95) at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46) at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Cheers, Dhanuka -- Nothing Impossible,Creativity is more important than knowledge. |
Add Dev Group On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <[hidden email]> wrote:
-- Nothing Impossible,Creativity is more important than knowledge.
|
Hi dhanuka, I failed to reproduce your error with release-1.7.0. It seems Kafka.toConnectorProperties() should be called instead of ConnectorDescriptor.toConnectorProperties(), the latter one is an abstract class, which lead to the AbstractMethodError. From the picture uploaded, it is strange that the jar of 1.6.1 is mixed with the jar of 1.7.0. It may result in class conflict problem. Furthermore, set flink dependency scope to provided, so that classes of flink will not be packaged into the user jar. It will also cause class conflict problem. Best, Hequn On Fri, Dec 21, 2018 at 6:24 PM dhanuka ranasinghe <[hidden email]> wrote:
|
Hi Cheng, Thanks for your reply will try out and update you on this. Cheers, Dhanuka On Sat, 22 Dec 2018, 20:41 Hequn Cheng <[hidden email] wrote:
|
In reply to this post by Hequn Cheng
Hi Cheng, I have removed 1.6.1 jars and then I got below error Starting execution of program ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.properties.0.key=zookeeper.connect connector.properties.0.value=localhost:2181 connector.properties.1.key=group.id connector.properties.1.value=analytics connector.properties.2.key=bootstrap.servers connector.properties.2.value=localhost:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=testin connector.type=kafka connector.version=universal format.fail-on-missing-field=false format.json-schema={\n \"type\": \"object\",\n \"properties\": {\n \"food\": {\n \"type\": \"string\"\n },\n \"price\": {\n \"type\": \"integer\"\n },\n \"processingTime\": {\n \"type\": \"integer\"\n }\n }\n} format.property-version=1 format.type=json schema.0.name=food schema.0.type=VARCHAR schema.1.name=price schema.1.type=DECIMAL schema.2.name=processingTime schema.2.proctime=true schema.2.type=TIMESTAMP update-mode=append The following factories have been considered: org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory org.apache.flink.formats.json.JsonRowFormatFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214) at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130) at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81) at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46) at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:97) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) On Sat, Dec 22, 2018 at 8:41 PM Hequn Cheng <[hidden email]> wrote:
Nothing Impossible,Creativity is more important than knowledge.
|
Hi Dhanuka, From the exceptions, it seems you have changed the Kafka version to 'universal'. You can solve your problem in any of the following ways: - Change Kafka version to 0.11. You only have a jar of 0.11 version in your lib folder. - Add flink-connector-kafka_2.11-1.7.0.jar to your lib folder if you want to use 'universal'. Best, Hequn On Sun, Dec 23, 2018 at 8:48 PM dhanuka ranasinghe <[hidden email]> wrote:
|
Hi Cheng, Thanks for your prompt reply. I was able to figured out the problem. What I got wrong was not properly configuring org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory as as TableSourceFactory in META-INF/services. Cheers, Dhanuka On Mon, Dec 24, 2018 at 10:15 AM Hequn Cheng <[hidden email]> wrote:
Nothing Impossible,Creativity is more important than knowledge.
|
Free forum by Nabble | Edit this page |