Simple Flink - Kafka Test

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Simple Flink - Kafka Test

shotte
Hi,
I am new to Flink and Kafka

I am trying to read from Flink a Kafka topic and sent it back to another Kafka topic

Here my setup:
Flink 0.10.1
Kafka 0.9

All that on a single node

I successfully wrote a Java program that send message to Kafka (topic = demo), and I have a consumer (in a shell) that read it. so that working.

When I execute the flink program I got this error
See code and Error Below...it is something between step C & D

What I am doing wrong ?

Thanks

Sylvain

<Code start>
package com.sylvain;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * Skeleton for a Flink Job.
 *
 * For a full example of a Flink Job, see the WordCountJob.java file in the
 * same package/directory or have a look at the website.
 *
 * You can also generate a .jar file that you can submit on your Flink
 * cluster.
 * Just type
 * mvn clean package
 * in the projects root directory.
 * You will find the jar in
 * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
 *
 */
public class Job {

        public static void main(String[] args) throws Exception {
                // set up the execution environment

                System.out.println("Step A");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       
        System.out.println("Step B");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");
       
        System.out.println("Step C");


                DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>("demo", new SimpleStringSchema(), properties));
               
                System.out.println("Step D");
                messageStream.map(new MapFunction<String, String>(){

                        @Override
                        public String map(String value) throws Exception {
                                // TODO Auto-generated method stub
                                return "Blablabla " +  value;
                        }
                       
                       
                }).addSink(new FlinkKafkaProducer("localhost:9092", "demo2", new SimpleStringSchema()));
               
               
                System.out.println("Step E");
                env.execute();
                System.out.println("Step F");
               
               
        }
}
<Code end> 

<Error Start>
[shotte@localhost flink-kafka]$ flink run ./target/flink-kafka-0.1.jar
Step A
Step B
Step C
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
        at kafka.utils.Pool.<init>(Pool.scala:28)
        at kafka.consumer.FetchRequestAndResponseStatsRegistry$.<init>(FetchRequestAndResponseStats.scala:60)
        at kafka.consumer.FetchRequestAndResponseStatsRegistry$.<clinit>(FetchRequestAndResponseStats.scala)
        at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
        at kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:691)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:281)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.<init>(FlinkKafkaConsumer082.java:49)
        at com.sylvain.Job.main(Job.java:64)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class
        at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 20 more

<Error End>


Reply | Threaded
Open this post in threaded view
|

Re: Simple Flink - Kafka Test

shotte
This post was updated on .
Do I need to go to Flink 1.0 or the downgrade to Kafka 0.8 ?

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html
Reply | Threaded
Open this post in threaded view
|

Re: Simple Flink - Kafka Test

Chiwan Park-2
Hi shotte,

The exception is caused by Scala version mismatch. If you want to use Scala 2.11, you have to set Flink dependencies compiled for Scala 2.11. We have a documentation about this in wiki [1].

I hope this helps.

[1]: https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

Regards,
Chiwan Park

> On Feb 10, 2016, at 9:39 AM, shotte <[hidden email]> wrote:
>
> Do I need to go to Flink 1.0 or the downgrade to Kafka 0.8 ?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4829.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Simple Flink - Kafka Test

Chiwan Park-2
The documentation I sent is for Flink 1.0.

In Flink 0.10.x, there is no suffix of dependencies for Scala 2.10 (e.g. flink-streaming-java). But there is a suffix of dependencies for Scala 2.11 (e.g. flink-streaming-java_2.11).

Regards,
Chiwan Park

> On Feb 10, 2016, at 1:46 PM, Chiwan Park <[hidden email]> wrote:
>
> Hi shotte,
>
> The exception is caused by Scala version mismatch. If you want to use Scala 2.11, you have to set Flink dependencies compiled for Scala 2.11. We have a documentation about this in wiki [1].
>
> I hope this helps.
>
> [1]: https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version
>
> Regards,
> Chiwan Park
>
>> On Feb 10, 2016, at 9:39 AM, shotte <[hidden email]> wrote:
>>
>> Do I need to go to Flink 1.0 or the downgrade to Kafka 0.8 ?
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4829.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Simple Flink - Kafka Test

shotte
Hi,

Thanks for your reply, but I am still a bit confuse.

I have downloaded  flink-0.10.1-bin-hadoop27-scala_2.11.tgz
and kafka_2.11-0.9.0.0.tgz

I did not install myself Scala

Now tell me if I understand correctly.
Depending on the version of Flink I have (in my case the scala 2.11) I must specify a dependency in the pom file


But still have the same error

Here's an extract of my file

                <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${flink.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients</artifactId>
                        <version>${flink.version}</version>
                </dependency>
                <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.10.1</version>
                </dependency>
                <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>0.10.1</version>
                </dependency>
        </dependencies>
Reply | Threaded
Open this post in threaded view
|

Re: Simple Flink - Kafka Test

shotte
Ok It is working now

I had to change a few dependency with the _2.11 suffix

Thanks

Sylvain


        <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${flink.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.11</artifactId>
                        <version>${flink.version}</version>
                </dependency>
                <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>0.10.1</version>
                </dependency>
                <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>0.10.1</version>
                </dependency>
        </dependencies>
Reply | Threaded
Open this post in threaded view
|

Re: Simple Flink - Kafka Test

Stephan Ewen
Yes, 0.10.x does not always have Scala version suffixes.

1.0 is doing this consistently, should cause less confusion...

On Wed, Feb 10, 2016 at 2:38 PM, shotte <[hidden email]> wrote:
Ok It is working now

I had to change a few dependency with the _2.11 suffix

Thanks

Sylvain


        <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${flink.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.11</artifactId>
                        <version>${flink.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka_2.11</artifactId>
                        <version>0.10.1</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.11</artifactId>
                        <version>0.10.1</version>
                </dependency>
        </dependencies>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4852.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Simple Flink - Kafka Test

rmetzger0
Quick clarification on Stephan's comment: In Flink 0.10, no suffix means scala 2.10, for Scala 2.11 you have to add the _2.11 suffix to ALL dependencies (including flink-java_2.11, flink-core_2.11 and so on).
In Flink 1.0, all artifacts depending on scala have a version suffix. For example flink-clients_2.10 or flink-runtime_2.10 depend on scala and have suffixes. However flink-java or flink-core do not depend on Scala at all, hence you can not put a suffix.

I think both approaches are confusing and inconsistent in some sense.

On Wed, Feb 10, 2016 at 3:49 PM, Stephan Ewen <[hidden email]> wrote:
Yes, 0.10.x does not always have Scala version suffixes.

1.0 is doing this consistently, should cause less confusion...

On Wed, Feb 10, 2016 at 2:38 PM, shotte <[hidden email]> wrote:
Ok It is working now

I had to change a few dependency with the _2.11 suffix

Thanks

Sylvain


        <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${flink.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.11</artifactId>
                        <version>${flink.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka_2.11</artifactId>
                        <version>0.10.1</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.11</artifactId>
                        <version>0.10.1</version>
                </dependency>
        </dependencies>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4852.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.