Re: Facing error while running Hands-in training example “writing to Apache Kafka”

Posted by Ashutosh Kumar-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Facing-error-while-running-Hands-in-training-example-writing-to-Apache-Kafka-tp7292p7305.html

How are you packaging and deploying your jar ? I have tested with flink and kafka .9 . It works fine for me .

Thanks
Ashutosh

On Wed, Jun 1, 2016 at 3:37 PM, ahmad Sa P <[hidden email]> wrote:
I did test it with Kafka 0.9.0.1, still the problem exists!

On Wed, Jun 1, 2016 at 11:50 AM, Aljoscha Krettek <[hidden email]> wrote:
The Flink Kafka Consumer was never tested with Kafka 0.10, could you try it with 0.9. The 0.10 release is still very new and we have yet to provide a consumer for that.

On Wed, 1 Jun 2016 at 10:47 ahmad Sa P <[hidden email]> wrote:
Hi Aljoscha,
I have tried different version of Flink  V 1.0.0 and 1.0.3 and Kafka version 0.10.0.0.
Ahmad



On Wed, Jun 1, 2016 at 10:39 AM, Aljoscha Krettek <[hidden email]> wrote:
This is unrelated to joda time or Kryo, that's just an info message in the log.

What version of Flink and Kafka are you using? 



On Wed, 1 Jun 2016 at 07:02 arpit srivastava <[hidden email]> wrote:
Flink uses kryo serialization which doesn't support joda time object serialization. 

Use java.util.date or you have to change kryo.

Thanks,
Arpit

On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <[hidden email]> wrote:
Hi
I have a problem at running a sample code from the hands-in examples of Apache Flink,
I used the  following code to send output of a stream to already running Apache Kafka, and get the below error. Could anyone tell me what is going wrong?

Best regards
Ahmad

public class RideCleansing {

    private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
    public static final String CLEANSED_RIDES_TOPIC = "mytopic";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f));

        DataStream<TaxiRide> filteredRides = rides.filter(new NYCFilter());

        filteredRides.addSink(new FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER,
                CLEANSED_RIDES_TOPIC,
                new TaxiRideSchema()));

        env.execute("Taxi Ride Cleansing");
    }

Error:
18:43:15,734 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime is not a valid POJO type Exception in thread "main" java.lang.NoClassDefFoundError: kafka/producer/Partitioner at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:455) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:367) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51) Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 13 more