ClassNotFoundException with Flink 1.4.1 and Kafka connector

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

ClassNotFoundException with Flink 1.4.1 and Kafka connector

Debasish Ghosh
Hi -

Facing a ClassNotFoundException while running Flink application that reads from Kafka. This is a modified version of the NYC Taxi App that reads from Kafka. 

I am using Flink 1.4.1 .. The application runs ok with Flink 1.3 ..

Here's the exception ..

java.lang.ClassNotFoundException: com.lightbend.fdp.sample.flink.app.TaxiRideTSAssigner
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

I run the application as follows ..

$ ./bin/flink run /Users/debasishghosh/flink/source/core/app/target/scala-2.11/app-assembly-1.0.jar --broker-list localhost:9092 --inTopic taxiin --outTopic taxiout

I verified that the jar contains the class ..

$ jar tvf app-assembly-1.0.jar | grep TaxiRideTSAssigner
  2090 Wed Feb 28 12:59:52 IST 2018 com/lightbend/fdp/sample/flink/app/TaxiRideTSAssigner.class


Here are the relevant dependencies in build ..

val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % "1.4.1" % "provided"
val flinkStreamingScala   =      "org.apache.flink"             %%   "flink-streaming-scala"          % "1.4.1" % "provided"
val flinkKafka            =      "org.apache.flink"             %%   "flink-connector-kafka-0.11"     % "1.4.1" exclude("org.slf4j", "slf4j-log4j12")


any help ?

regards.


--
Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException with Flink 1.4.1 and Kafka connector

Chesnay Schepler
Hello,

this is probably caused by a known issue in 1.4.1: https://issues.apache.org/jira/browse/FLINK-8741

This bug is not present in 1.4.0, and it will fixed in 1.4.2 which should be released within the next days.

As a temporary workaround you can copy your app-assembly-1.0.jar into the /lib directory.

On 28.02.2018 08:45, Debasish Ghosh wrote:
Hi -

Facing a ClassNotFoundException while running Flink application that reads from Kafka. This is a modified version of the NYC Taxi App that reads from Kafka. 

I am using Flink 1.4.1 .. The application runs ok with Flink 1.3 ..

Here's the exception ..

java.lang.ClassNotFoundException: com.lightbend.fdp.sample.flink.app.TaxiRideTSAssigner
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

I run the application as follows ..

$ ./bin/flink run /Users/debasishghosh/flink/source/core/app/target/scala-2.11/app-assembly-1.0.jar --broker-list localhost:9092 --inTopic taxiin --outTopic taxiout

I verified that the jar contains the class ..

$ jar tvf app-assembly-1.0.jar | grep TaxiRideTSAssigner
  2090 Wed Feb 28 12:59:52 IST 2018 com/lightbend/fdp/sample/flink/app/TaxiRideTSAssigner.class


Here are the relevant dependencies in build ..

val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % "1.4.1" % "provided"
val flinkStreamingScala   =      "org.apache.flink"             %%   "flink-streaming-scala"          % "1.4.1" % "provided"
val flinkKafka            =      "org.apache.flink"             %%   "flink-connector-kafka-0.11"     % "1.4.1" exclude("org.slf4j", "slf4j-log4j12")


any help ?

regards.


--


Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException with Flink 1.4.1 and Kafka connector

Debasish Ghosh
Thanks for the suggestion. I copied the application jar to lib. The error doesn't come but I get another error related to Kafka ..

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:949)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:681)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:93)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:350)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:342)
... 17 more

regards.

On Wed, Feb 28, 2018 at 2:31 PM, Chesnay Schepler <[hidden email]> wrote:
Hello,

this is probably caused by a known issue in 1.4.1: https://issues.apache.org/jira/browse/FLINK-8741

This bug is not present in 1.4.0, and it will fixed in 1.4.2 which should be released within the next days.

As a temporary workaround you can copy your app-assembly-1.0.jar into the /lib directory.


On 28.02.2018 08:45, Debasish Ghosh wrote:
Hi -

Facing a ClassNotFoundException while running Flink application that reads from Kafka. This is a modified version of the NYC Taxi App that reads from Kafka. 

I am using Flink 1.4.1 .. The application runs ok with Flink 1.3 ..

Here's the exception ..

java.lang.ClassNotFoundException: com.lightbend.fdp.sample.flink.app.TaxiRideTSAssigner
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

I run the application as follows ..

$ ./bin/flink run /Users/debasishghosh/flink/source/core/app/target/scala-2.11/app-assembly-1.0.jar --broker-list localhost:9092 --inTopic taxiin --outTopic taxiout

I verified that the jar contains the class ..

$ jar tvf app-assembly-1.0.jar | grep TaxiRideTSAssigner
  2090 Wed Feb 28 12:59:52 IST 2018 com/lightbend/fdp/sample/flink/app/TaxiRideTSAssigner.class


Here are the relevant dependencies in build ..

val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % "1.4.1" % "provided"
val flinkStreamingScala   =      "org.apache.flink"             %%   "flink-streaming-scala"          % "1.4.1" % "provided"
val flinkKafka            =      "org.apache.flink"             %%   "flink-connector-kafka-0.11"     % "1.4.1" exclude("org.slf4j", "slf4j-log4j12")


any help ?

regards.


--





--
Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException with Flink 1.4.1 and Kafka connector

Chesnay Schepler
Well we just ended up in ClassLoader hell...

There are 2 config options that could help:
  • add "org.apache.kafka." to "classloader.parent-first-patterns"
    • make sure you include the default as well ("java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback")
  • set "classloader.resolve-order" to "parent-first"

On 28.02.2018 14:28, Debasish Ghosh wrote:
Thanks for the suggestion. I copied the application jar to lib. The error doesn't come but I get another error related to Kafka ..

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:949)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:681)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:93)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:350)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:342)
... 17 more

regards.

On Wed, Feb 28, 2018 at 2:31 PM, Chesnay Schepler <[hidden email]> wrote:
Hello,

this is probably caused by a known issue in 1.4.1: https://issues.apache.org/jira/browse/FLINK-8741

This bug is not present in 1.4.0, and it will fixed in 1.4.2 which should be released within the next days.

As a temporary workaround you can copy your app-assembly-1.0.jar into the /lib directory.


On 28.02.2018 08:45, Debasish Ghosh wrote:
Hi -

Facing a ClassNotFoundException while running Flink application that reads from Kafka. This is a modified version of the NYC Taxi App that reads from Kafka. 

I am using Flink 1.4.1 .. The application runs ok with Flink 1.3 ..

Here's the exception ..

java.lang.ClassNotFoundException: com.lightbend.fdp.sample.flink.app.TaxiRideTSAssigner
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

I run the application as follows ..

$ ./bin/flink run /Users/debasishghosh/flink/source/core/app/target/scala-2.11/app-assembly-1.0.jar --broker-list localhost:9092 --inTopic taxiin --outTopic taxiout

I verified that the jar contains the class ..

$ jar tvf app-assembly-1.0.jar | grep TaxiRideTSAssigner
  2090 Wed Feb 28 12:59:52 IST 2018 com/lightbend/fdp/sample/flink/app/TaxiRideTSAssigner.class


Here are the relevant dependencies in build ..

val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % "1.4.1" % "provided"
val flinkStreamingScala   =      "org.apache.flink"             %%   "flink-streaming-scala"          % "1.4.1" % "provided"
val flinkKafka            =      "org.apache.flink"             %%   "flink-connector-kafka-0.11"     % "1.4.1" exclude("org.slf4j", "slf4j-log4j12")


any help ?

regards.


--





--


Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException with Flink 1.4.1 and Kafka connector

Debasish Ghosh
Thanks for the suggestion .. it works now. Also it works cleanly with 1.4.0 (without the hacks) ..

On Wed, Feb 28, 2018 at 7:14 PM, Chesnay Schepler <[hidden email]> wrote:
Well we just ended up in ClassLoader hell...

There are 2 config options that could help:
  • add "org.apache.kafka." to "classloader.parent-first-patterns"
    • make sure you include the default as well ("java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback")
  • set "classloader.resolve-order" to "parent-first"

On 28.02.2018 14:28, Debasish Ghosh wrote:
Thanks for the suggestion. I copied the application jar to lib. The error doesn't come but I get another error related to Kafka ..

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:949)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:681)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:93)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:350)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:342)
... 17 more

regards.

On Wed, Feb 28, 2018 at 2:31 PM, Chesnay Schepler <[hidden email]> wrote:
Hello,

this is probably caused by a known issue in 1.4.1: https://issues.apache.org/jira/browse/FLINK-8741

This bug is not present in 1.4.0, and it will fixed in 1.4.2 which should be released within the next days.

As a temporary workaround you can copy your app-assembly-1.0.jar into the /lib directory.


On 28.02.2018 08:45, Debasish Ghosh wrote:
Hi -

Facing a ClassNotFoundException while running Flink application that reads from Kafka. This is a modified version of the NYC Taxi App that reads from Kafka. 

I am using Flink 1.4.1 .. The application runs ok with Flink 1.3 ..

Here's the exception ..

java.lang.ClassNotFoundException: com.lightbend.fdp.sample.flink.app.TaxiRideTSAssigner
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

I run the application as follows ..

$ ./bin/flink run /Users/debasishghosh/flink/source/core/app/target/scala-2.11/app-assembly-1.0.jar --broker-list localhost:9092 --inTopic taxiin --outTopic taxiout

I verified that the jar contains the class ..

$ jar tvf app-assembly-1.0.jar | grep TaxiRideTSAssigner
  2090 Wed Feb 28 12:59:52 IST 2018 com/lightbend/fdp/sample/flink/app/TaxiRideTSAssigner.class


Here are the relevant dependencies in build ..

val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % "1.4.1" % "provided"
val flinkStreamingScala   =      "org.apache.flink"             %%   "flink-streaming-scala"          % "1.4.1" % "provided"
val flinkKafka            =      "org.apache.flink"             %%   "flink-connector-kafka-0.11"     % "1.4.1" exclude("org.slf4j", "slf4j-log4j12")


any help ?

regards.


--





--





--