PyFlink Kafka-Connector NoClassDefFoundError

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink Kafka-Connector NoClassDefFoundError

G.G.M.5611

Hi,
I am trying to run a very basic job in PyFlink (getting Data from a Kafka-Topic and printing the stream).

In the command line I run:

./bin/flink run \
--python /home/ubuntu/load_kafka.py \
--jarfile /home/ubuntu/flink-connector-kafka_2.12-1.12.2.jar

I downloaded the jar from:
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka

Now I get the following error:

File "/home/ubuntu/load_kafka.py", line 16, in <module>
    kafka_consumer = FlinkKafkaConsumer("twitter-stream-source", json_row_schema, kafka_props)
  File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/connectors.py", line 179, in __init__
  File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/connectors.py", line 329, in _get_kafka_consumer
  File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1553, in __call__
  File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:139)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)
        at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 15 more

org.apache.flink.client.program.ProgramAbortException
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

I'm thinking that I might be providing the wrong jar, but don't really have any idea.
This is my code:

from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

type_info = Types.ROW([Types.ROW([Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT()]), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]),
Types.ROW([Types.ROW([Types.ROW([Types.BOOLEAN(), Types.STRING(), Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT()]), Types.STRING()])])]),
Types.ROW([Types.ROW([Types.STRING(), Types.STRING()])])])

json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'twitter_consumers'}
kafka_consumer = FlinkKafkaConsumer("twitter-stream-source", json_row_schema, kafka_props)
# research this
kafka_consumer.set_start_from_earliest()

ds = env.add_source(kafka_consumer)
ds.print()
ds.execute()

Thanks a lot!
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Kafka-Connector NoClassDefFoundError

Dian Fu
Hi,

You need to use the fat jar [1] as documented in the Kafka Table & SQL connector page [2].


Regards,
Dian

2021年4月19日 上午1:26,[hidden email] 写道:


Hi,
I am trying to run a very basic job in PyFlink (getting Data from a Kafka-Topic and printing the stream).

In the command line I run:

./bin/flink run \
--python /home/ubuntu/load_kafka.py \
--jarfile /home/ubuntu/flink-connector-kafka_2.12-1.12.2.jar

I downloaded the jar from:
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka

Now I get the following error:

File "/home/ubuntu/load_kafka.py", line 16, in <module>
   kafka_consumer = FlinkKafkaConsumer("twitter-stream-source", json_row_schema, kafka_props)
 File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/connectors.py", line 179, in __init__
 File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/connectors.py", line 329, in _get_kafka_consumer
 File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1553, in __call__
 File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
 File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:139)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)
at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 15 more

org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

I'm thinking that I might be providing the wrong jar, but don't really have any idea.
This is my code:

from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

type_info = Types.ROW([Types.ROW([Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT()]), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]),
Types.ROW([Types.ROW([Types.ROW([Types.BOOLEAN(), Types.STRING(), Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT()]), Types.STRING()])])]),
Types.ROW([Types.ROW([Types.STRING(), Types.STRING()])])])

json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'twitter_consumers'}
kafka_consumer = FlinkKafkaConsumer("twitter-stream-source", json_row_schema, kafka_props)
# research this
kafka_consumer.set_start_from_earliest()

ds = env.add_source(kafka_consumer)
ds.print()
ds.execute()

Thanks a lot!

Reply | Threaded
Open this post in threaded view
|

Aw: Re: PyFlink Kafka-Connector NoClassDefFoundError

G.G.M.5611
Hi Dian,
thanks, that did the trick.
Unfortunately, I have a new problem now.
 
As I said I'm trying to read json data from a kafka topic into a datastream. I tried doing this using the JsonRowDeserializationSchema-class as below (the Json-objects are tweets and thus pretty nested and complex). However that causes the following error:
 

Traceback (most recent call last):
  File "/home/ubuntu/load_kafka.py", line 21, in <module>
    env.execute()
  File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", line 623, in execute
  File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1285, in __call__
  File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 00392ec565588adc82c8dc405fdb9e1d)
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
    at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 00392ec565588adc82c8dc405fdb9e1d)
    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
    ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to deserialize JSON '
'.
    at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
    at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
    at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:518)
    at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:266)
    at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:143)
    ... 9 more
org.apache.flink.client.program.ProgramAbortException
    at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
 
The Json-Objects look like this (modified out of privacy concerns):


{"data":{"text":"xy",
         "public_metrics":{"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
         "author_id":"123","id":"123","created_at":"00000",
         "source":"Twitter for Android","lang":"in"},
 "includes":{"users":[{"protected":false,"id":"123",
         "name":"nobody","created_at":"000",
         "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},"username":"nobody"}]},
 "matching_rules":[{"id":123,"tag":"nothing"}]}
 
I'm guess that one reason for why my code isn't working could be that I specify

matching_rules":[{"id":123,"tag":"nothing"}]}

as

Types.ROW([Types.ROW([Types.STRING(), Types.STRING()])])

I'm aware that I would need something like Types.ARRAY here, but it seems that only supports primitive Types as array-elements and not rows.

Given that the JsonRowDeserializationSchema didnt work I have tried to do it with the SimpleStringSchema as follows:

from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'twitter_consumers'}
kafka_consumer = FlinkKafkaConsumer("twitter-stream", SimpleStringSchema(), kafka_props)

ds = env.add_source(kafka_consumer)
ds.print()
env.execute()

But this also didn't work, as nothing happens (no output or anything).

Do you have an idea what I'm doing wrong? Or should I maybe open a new thread for this? Unfortunately, there is very little documentation about this kind of stuff and how to use it.

Thanks a lot,
Giacomo
 
 

Gesendet: Montag, 19. April 2021 um 03:51 Uhr
Von: "Dian Fu" <[hidden email]>
An: [hidden email]
Cc: "user" <[hidden email]>
Betreff: Re: PyFlink Kafka-Connector NoClassDefFoundError

Hi,
 
You need to use the fat jar [1] as documented in the Kafka Table & SQL connector page [2].
 
[1] https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.2/flink-sql-connector-kafka_2.11-1.12.2.jar
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html[https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html]
 
Regards,
Dian
 
2021年4月19日 上午1:26,[hidden email][mailto:[hidden email]] 写道: 

Hi,
I am trying to run a very basic job in PyFlink (getting Data from a Kafka-Topic and printing the stream).

In the command line I run:

./bin/flink run \
--python /home/ubuntu/load_kafka.py \
--jarfile /home/ubuntu/flink-connector-kafka_2.12-1.12.2.jar

I downloaded the jar from:
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka[https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka]

Now I get the following error:

File "/home/ubuntu/load_kafka.py", line 16, in <module>
   kafka_consumer = FlinkKafkaConsumer("twitter-stream-source", json_row_schema, kafka_props)
 File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/connectors.py", line 179, in __init__
 File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/connectors.py", line 329, in _get_kafka_consumer
 File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1553, in __call__
 File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
 File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:139)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)
at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 15 more

org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

I'm thinking that I might be providing the wrong jar, but don't really have any idea.
This is my code:

from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

type_info = Types.ROW([Types.ROW([Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT()]), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]),
Types.ROW([Types.ROW([Types.ROW([Types.BOOLEAN(), Types.STRING(), Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT()]), Types.STRING()])])]),
Types.ROW([Types.ROW([Types.STRING(), Types.STRING()])])])

json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'twitter_consumers'}
kafka_consumer = FlinkKafkaConsumer("twitter-stream-source", json_row_schema, kafka_props)
# research this
kafka_consumer.set_start_from_earliest()

ds = env.add_source(kafka_consumer)
ds.print()
ds.execute()

Thanks a lot!
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Kafka-Connector NoClassDefFoundError

Dian Fu
Hi Giacomo,

AFAIK, it should support accepting row type as the array elements. Did you encounter some problems? Besides, it would be great if you could share a minimal example which could reproduce the above exception (along with the test data).

Regards,
Dian

> 2021年4月20日 上午12:20,[hidden email] 写道:
>
> Hi Dian,
> thanks, that did the trick.
> Unfortunately, I have a new problem now.
>  
> As I said I'm trying to read json data from a kafka topic into a datastream. I tried doing this using the JsonRowDeserializationSchema-class as below (the Json-objects are tweets and thus pretty nested and complex). However that causes the following error:
>  
>
> Traceback (most recent call last):
>   File "/home/ubuntu/load_kafka.py", line 21, in <module>
>     env.execute()
>   File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", line 623, in execute
>   File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1285, in __call__
>   File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
>   File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o1.execute.
> : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 00392ec565588adc82c8dc405fdb9e1d)
>     at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
>     at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>     at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>     at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>     at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 00392ec565588adc82c8dc405fdb9e1d)
>     at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
>     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>     at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>     at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>     at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>     at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>     at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>     at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>     at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>     at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>     at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     ... 1 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>     at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
>     ... 19 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Failed to deserialize JSON '
> '.
>     at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
>     at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
>     at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
> Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
>     at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:518)
>     at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:266)
>     at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:143)
>     ... 9 more
> org.apache.flink.client.program.ProgramAbortException
>     at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>     at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>     at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>     at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>  
> The Json-Objects look like this (modified out of privacy concerns):
>
>
> {"data":{"text":"xy",
>         "public_metrics":{"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
>         "author_id":"123","id":"123","created_at":"00000",
>         "source":"Twitter for Android","lang":"in"},
> "includes":{"users":[{"protected":false,"id":"123",
>         "name":"nobody","created_at":"000",
>         "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},"username":"nobody"}]},
> "matching_rules":[{"id":123,"tag":"nothing"}]}
>  
> I'm guess that one reason for why my code isn't working could be that I specify
>
> matching_rules":[{"id":123,"tag":"nothing"}]}
>
> as
>
> Types.ROW([Types.ROW([Types.STRING(), Types.STRING()])])
>
> I'm aware that I would need something like Types.ARRAY here, but it seems that only supports primitive Types as array-elements and not rows.
>
> Given that the JsonRowDeserializationSchema didnt work I have tried to do it with the SimpleStringSchema as follows:
>
> from pyflink.common.serialization import SimpleStringSchema
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import FlinkKafkaConsumer
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
>
> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'twitter_consumers'}
> kafka_consumer = FlinkKafkaConsumer("twitter-stream", SimpleStringSchema(), kafka_props)
>
> ds = env.add_source(kafka_consumer)
> ds.print()
> env.execute()
>
> But this also didn't work, as nothing happens (no output or anything).
>
> Do you have an idea what I'm doing wrong? Or should I maybe open a new thread for this? Unfortunately, there is very little documentation about this kind of stuff and how to use it.
>
> Thanks a lot,
> Giacomo
>  
>  
>
> Gesendet: Montag, 19. April 2021 um 03:51 Uhr
> Von: "Dian Fu" <[hidden email]>
> An: [hidden email]
> Cc: "user" <[hidden email]>
> Betreff: Re: PyFlink Kafka-Connector NoClassDefFoundError
>
> Hi,
>  
> You need to use the fat jar [1] as documented in the Kafka Table & SQL connector page [2].
>  
> [1] https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.2/flink-sql-connector-kafka_2.11-1.12.2.jar
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html[https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html]
>  
> Regards,
> Dian
>  
> 2021年4月19日 上午1:26,[hidden email][mailto:[hidden email]] 写道:
>
> Hi,
> I am trying to run a very basic job in PyFlink (getting Data from a Kafka-Topic and printing the stream).
>
> In the command line I run:
>
> ./bin/flink run \
> --python /home/ubuntu/load_kafka.py \
> --jarfile /home/ubuntu/flink-connector-kafka_2.12-1.12.2.jar
>
> I downloaded the jar from:
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka[https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka]
>
> Now I get the following error:
>
> File "/home/ubuntu/load_kafka.py", line 16, in <module>
>    kafka_consumer = FlinkKafkaConsumer("twitter-stream-source", json_row_schema, kafka_props)
>  File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/connectors.py", line 179, in __init__
>  File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/datastream/connectors.py", line 329, in _get_kafka_consumer
>  File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1553, in __call__
>  File "/home/ubuntu/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
>  File "/home/ubuntu/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
> : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:139)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)
> at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ... 15 more
>
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> I'm thinking that I might be providing the wrong jar, but don't really have any idea.
> This is my code:
>
> from pyflink.common.serialization import JsonRowDeserializationSchema
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import FlinkKafkaConsumer
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
>
> type_info = Types.ROW([Types.ROW([Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT()]), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]),
> Types.ROW([Types.ROW([Types.ROW([Types.BOOLEAN(), Types.STRING(), Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT()]), Types.STRING()])])]),
> Types.ROW([Types.ROW([Types.STRING(), Types.STRING()])])])
>
> json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'twitter_consumers'}
> kafka_consumer = FlinkKafkaConsumer("twitter-stream-source", json_row_schema, kafka_props)
> # research this
> kafka_consumer.set_start_from_earliest()
>
> ds = env.add_source(kafka_consumer)
> ds.print()
> ds.execute()
>
> Thanks a lot!