Have you seen this error by any chance in flink streaming with Kafka please? org.apache.flink.client.program.ProgramInvocationException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at md_streaming$.main(md_streaming.scala:30) at md_streaming.main(md_streaming.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) thanks Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
|
Looks like a version issue , have you made sure that the Kafka version is compatible?
|
In reply to this post by Mich Talebzadeh
Hi, This is the code import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; object md_streaming { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "rhes75:9092") properties.setProperty("zookeeper.connect", "rhes75:2181") properties.setProperty("group.id", "md_streaming") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("/tmp/md_streaming.txt") env.execute("Flink Kafka Example") } and this is the sbt dependencies libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" Thanks Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Mon, 2 Jul 2018 at 17:45, Ted Yu <[hidden email]> wrote:
|
This is becoming very tedious. As suggested I changed the kafka dependency from ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" to libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" and compiled and ran the job again anf failed. This is the log file 2018-07-02 21:38:38,656 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.0 2018-07-02 21:38:38,656 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fdcf75ea326b8e07 2018-07-02 21:38:38,696 INFO org.apache.kafka.clients.Metadata - Cluster ID: 3SqEt4DcTruOr_SlQ6fqTQ 2018-07-02 21:38:38,698 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='md', partition=0}] 2018-07-02 21:38:38,702 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [rhes75:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = md_streaming heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2018-07-02 21:38:38,705 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.0 2018-07-02 21:38:38,705 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fdcf75ea326b8e07 2018-07-02 21:38:38,705 WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-2 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:785) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:171) 2018-07-02 21:38:38,709 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Error while closing Kafka consumer java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:286) 2018-07-02 21:38:38,710 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b). 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160d Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Mon, 2 Jul 2018 at 20:59, Ted Yu <[hidden email]> wrote:
|
Hi Mich, FlinkKafkaConsumer09 is the connector for Kafka 0.9.x. Have you tried to use FlinkKafkaConsumer011 instead of FlinkKafkaConsumer09? Best, Fabian 2018-07-02 22:57 GMT+02:00 Mich Talebzadeh <[hidden email]>:
|
Hi Fabian. Thanks. Great contribution! It is working info] SHA-1: 98d78b909631e5d30664df6a7a4a3f421d4fd33b [info] Packaging /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/md_streaming-assembly-1.0.jar ... [info] Done packaging. [success] Total time: 14 s, completed Jul 3, 2018 9:32:25 AM Completed compiling Tue Jul 3 09:32:25 BST 2018 , Running in **Standalone mode** Starting execution of program And the test prices generated in the topic are added to the file for each security ad534c19-fb77-4966-86a8-dc411d7a0607,MKS,2018-07-03T09:50:03,319.2 e96e5d96-03fc-4603-899f-5ea5151fc280,IBM,2018-07-03T09:50:07,124.67 a64a51a4-f27c-439b-b9cc-28dc593acab3,MRW,2018-07-03T09:50:07,286.27 2d15089d-3635-4a7e-b2d5-20b92dd67186,MSFT,2018-07-03T09:50:07,22.8 415d1215-18e6-46ca-a771-98c614e8c3fb,ORCL,2018-07-03T09:50:07,32.57 e0dd5832-20bd-4951-a4dd-3a3a10c99a01,SAP,2018-07-03T09:50:07,69.32 4222eea9-d9a7-46e1-8b1e-c2b634170fad,SBRY,2018-07-03T09:50:07,235.22 4f2e0927-29ff-44ff-aa2e-9b16fdcd0024,TSCO,2018-07-03T09:50:07,403.64 49437f8b-5e2b-42e9-b3d7-f95eee9c5432,VOD,2018-07-03T09:50:07,239.08 0f96463e-40a5-47c5-b2c6-7191992ab0b1,BP,2018-07-03T09:50:07,587.75 d0041bf1-a313-4623-a7cc-2ce8204590bb,MKS,2018-07-03T09:50:07,406.02 c443ac53-f762-4fad-b11c-0fd4e98812fb,IBM,2018-07-03T09:50:10,168.52 67f2d372-f918-445e-8dac-7556a2dfd0aa,MRW,2018-07-03T09:50:10,293.2 57372392-53fd-48eb-aa94-c317c75d6545,MSFT,2018-07-03T09:50:10,46.53 c3839c12-be63-416c-a404-8d0333071559,ORCL,2018-07-03T09:50:10,31.57 29eca46c-bd4c-475e-a9c9-7bf105fcc935,SAP,2018-07-03T09:50:10,77.81 89f98ad0-dc34-476f-baa5-fc2fa92aa2d5,SBRY,2018-07-03T09:50:10,239.95 431494f3-1215-48ae-a534-5bf3fbe20f2f,TSCO,2018-07-03T09:50:10,331.12 2203095f-8826-424d-a1e3-fa212194ac35,VOD,2018-07-03T09:50:10,232.05 816ddc9b-f403-4ea9-8d55-c3afd0eae110,BP,2018-07-03T09:50:10,506.4 23c07878-d64d-4d1e-84a4-c14c23357467,MKS,2018-07-03T09:50:10,473.06 kind regards, Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 3 Jul 2018 at 09:11, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |