Kafka0.8.2.1 + Flink0.9.0 issue

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka0.8.2.1 + Flink0.9.0 issue

hawin

Hi All

 

I am preparing Kafka and Flink performance test now.  In order to avoid my mistakes, I have downloaded Kafka example from http://kafka.apache.org/ and Flink streaming Kafka example from http://flink.apache.org

I have run both producer examples on the same cluster.  No any issues from kafka.apache.org.

But I have received some errors as below when I ran apache Flink Kafka producer.  

I also posted both code for your reference. Please take a look at it.

Thanks.

 

Exception in thread "main" java.lang.Error: Unresolved compilation problems:

    The import kafka.consumer cannot be resolved

    The import kafka.consumer cannot be resolved

    The import kafka.consumer cannot be resolved

    The import kafka.consumer cannot be resolved

    The import kafka.javaapi cannot be resolved

    ConsumerConnector cannot be resolved to a type

    ConsumerIterator cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    Consumer cannot be resolved

    ConsumerConfig cannot be resolved to a type

    KafkaStream cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    KafkaStream cannot be resolved to a type

    KafkaStream cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    ConsumerIterator cannot be resolved to a type

    ConsumerIterator cannot be resolved to a type

    ConsumerIterator cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

 

    at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.<init>(KafkaSource.java:26)

    at org.apache.flink.streaming.connectors.kafka.KafkaConsumerExample.main(KafkaConsumerExample.java:42)

 

 

 

Here is the Apache Flink example:

*************************************Apache Flink***********************************************************************

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);

 

        @SuppressWarnings({ "unused", "serial" })

        DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {

            public void run(Collector<String> collector) throws Exception {

                for (int i = 0; i < 20; i++) {

                    collector.collect("message #" + i);

                    Thread.sleep(100L);

                }

 

                collector.collect(new String("q"));

            }

 

            public void cancel() {             

            }

           

           

        }).addSink(

                new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())

        )

        .setParallelism(3);

 

        System.out.println(host+" "+port+" "+topic);

       

        env.execute();

 

 

 

**********************************Apache Kafka***************************************************************

public Producer(String topic)

  {

    props.put("serializer.class", "kafka.serializer.StringEncoder");

    props.put("metadata.broker.list", "192.168.0.112:9092");

    // Use random partitioner. Don't need the key type. Just set it to Integer.

    // The message is of type String.

    producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));

    this.topic = topic;

  }

 

  public void run() {

    int messageNo = 1;

    while(true)

    {

      String messageStr = new String("LA_" + messageNo);

      producer.send(new KeyedMessage<Integer, String>(topic, messageStr));

      messageNo++;

    }

  }

 

 

 

 

Best regards

Hawin

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

Márton Balassi
Dear Hawin,

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

Best,

Marton

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:


Reply | Threaded
Open this post in threaded view
|

RE: Kafka0.8.2.1 + Flink0.9.0 issue

hawin

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 

Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

Márton Balassi
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 


Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

hawin
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 



Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

Márton Balassi
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course.
Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that.


On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 




Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

hawin
Hi  Marton

I have to add whole pom.xml file or just only plugin as below. 
I saw L286 to L296 are not correct information in pom.xml.
Thanks.



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <[hidden email]> wrote:
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course.
Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that.


On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 





Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

Ashutosh Kumar
I use following dependencies and it works fine .

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>

On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <[hidden email]> wrote:
Hi  Marton

I have to add whole pom.xml file or just only plugin as below. 
I saw L286 to L296 are not correct information in pom.xml.
Thanks.



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <[hidden email]> wrote:
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course.
Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that.


On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 






Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

Márton Balassi
Dear Hawin,

Sorry, I ahve managed to link to a pom that has been changed in the meantime. But we have added a section to our doc clarifying your question. [1] Since then Stephan has proposed an even nicer solution that did not make it into the doc yet, namely if you start from our quickstart pom and add your dependencies to that simply executing 'mvn package -Pbuild-jar' you get a jar with all your the code that is needed to run it on the cluster, but not more. See [3] for more on the quickstart.


On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar <[hidden email]> wrote:
I use following dependencies and it works fine .

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>

On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <[hidden email]> wrote:
Hi  Marton

I have to add whole pom.xml file or just only plugin as below. 
I saw L286 to L296 are not correct information in pom.xml.
Thanks.



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <[hidden email]> wrote:
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course.
Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that.


On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 







Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

hawin
Dear Marton

I have upgraded my Flink to 0.9.0.  But I could not consume a data from Kafka by Flink.
I have fully followed your example. 
Please help me.
Thanks.


Here is my code 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);

DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
kafkaStream.print();

env.execute();


Here are some errors:

15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutting down
15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Stopped 
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutdown completed
15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1435298169147] All connections stopped
15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a closed
15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector: [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown completed in 40 ms
15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream Sink (3/4) failed
org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40)
at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24)
at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid stream header: 68617769
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222)
... 8 more

On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

Sorry, I ahve managed to link to a pom that has been changed in the meantime. But we have added a section to our doc clarifying your question. [1] Since then Stephan has proposed an even nicer solution that did not make it into the doc yet, namely if you start from our quickstart pom and add your dependencies to that simply executing 'mvn package -Pbuild-jar' you get a jar with all your the code that is needed to run it on the cluster, but not more. See [3] for more on the quickstart.


On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar <[hidden email]> wrote:
I use following dependencies and it works fine .

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>

On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <[hidden email]> wrote:
Hi  Marton

I have to add whole pom.xml file or just only plugin as below. 
I saw L286 to L296 are not correct information in pom.xml.
Thanks.



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <[hidden email]> wrote:
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course.
Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that.


On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 








Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

hawin
Dear Marton


Here are some errors when I run KafkaProducerExample.java from Eclipse.

kafka.common.KafkaException: fetching topic metadata for topics [Set(flink-kafka-topic)] from broker [ArrayBuffer(id:0,host:192.168.0.112,port:2181)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:183)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:102)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

On Thu, Jun 25, 2015 at 11:06 PM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

I have upgraded my Flink to 0.9.0.  But I could not consume a data from Kafka by Flink.
I have fully followed your example. 
Please help me.
Thanks.


Here is my code 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);

DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
kafkaStream.print();

env.execute();


Here are some errors:

15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutting down
15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Stopped 
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutdown completed
15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1435298169147] All connections stopped
15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a closed
15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector: [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown completed in 40 ms
15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream Sink (3/4) failed
org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40)
at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24)
at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid stream header: 68617769
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222)
... 8 more

On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

Sorry, I ahve managed to link to a pom that has been changed in the meantime. But we have added a section to our doc clarifying your question. [1] Since then Stephan has proposed an even nicer solution that did not make it into the doc yet, namely if you start from our quickstart pom and add your dependencies to that simply executing 'mvn package -Pbuild-jar' you get a jar with all your the code that is needed to run it on the cluster, but not more. See [3] for more on the quickstart.


On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar <[hidden email]> wrote:
I use following dependencies and it works fine .

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>

On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <[hidden email]> wrote:
Hi  Marton

I have to add whole pom.xml file or just only plugin as below. 
I saw L286 to L296 are not correct information in pom.xml.
Thanks.



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <[hidden email]> wrote:
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course.
Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that.


On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 









Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

Aljoscha Krettek
Hi,
could you please try replacing JavaDefaultStringSchema() with SimpleStringSchema() in your first example. The one where you get this exception:
org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769

Cheers,
Aljoscha

On Fri, 26 Jun 2015 at 08:21 Hawin Jiang <[hidden email]> wrote:
Dear Marton


Here are some errors when I run KafkaProducerExample.java from Eclipse.

kafka.common.KafkaException: fetching topic metadata for topics [Set(flink-kafka-topic)] from broker [ArrayBuffer(id:0,host:192.168.0.112,port:2181)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:183)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:102)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

On Thu, Jun 25, 2015 at 11:06 PM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

I have upgraded my Flink to 0.9.0.  But I could not consume a data from Kafka by Flink.
I have fully followed your example. 
Please help me.
Thanks.


Here is my code 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);

DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
kafkaStream.print();

env.execute();


Here are some errors:

15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutting down
15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Stopped 
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutdown completed
15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1435298169147] All connections stopped
15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a closed
15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector: [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown completed in 40 ms
15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream Sink (3/4) failed
org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40)
at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24)
at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid stream header: 68617769
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222)
... 8 more

On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

Sorry, I ahve managed to link to a pom that has been changed in the meantime. But we have added a section to our doc clarifying your question. [1] Since then Stephan has proposed an even nicer solution that did not make it into the doc yet, namely if you start from our quickstart pom and add your dependencies to that simply executing 'mvn package -Pbuild-jar' you get a jar with all your the code that is needed to run it on the cluster, but not more. See [3] for more on the quickstart.


On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar <[hidden email]> wrote:
I use following dependencies and it works fine .

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>

On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <[hidden email]> wrote:
Hi  Marton

I have to add whole pom.xml file or just only plugin as below. 
I saw L286 to L296 are not correct information in pom.xml.
Thanks.



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <[hidden email]> wrote:
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course.
Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that.


On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote:

 









Reply | Threaded
Open this post in threaded view
|

Re: Kafka0.8.2.1 + Flink0.9.0 issue

hawin
Hi  Aljoscha

You are the best. 
Thank you very much.
Right now, It is working now. 



Best regards
Hawin

On Fri, Jun 26, 2015 at 12:28 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
could you please try replacing JavaDefaultStringSchema() with SimpleStringSchema() in your first example. The one where you get this exception:
org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769

Cheers,
Aljoscha

On Fri, 26 Jun 2015 at 08:21 Hawin Jiang <[hidden email]> wrote:
Dear Marton


Here are some errors when I run KafkaProducerExample.java from Eclipse.

kafka.common.KafkaException: fetching topic metadata for topics [Set(flink-kafka-topic)] from broker [ArrayBuffer(id:0,host:192.168.0.112,port:2181)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:183)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:102)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

On Thu, Jun 25, 2015 at 11:06 PM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

I have upgraded my Flink to 0.9.0.  But I could not consume a data from Kafka by Flink.
I have fully followed your example. 
Please help me.
Thanks.


Here is my code 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);

DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
kafkaStream.print();

env.execute();


Here are some errors:

15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutting down
15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Stopped 
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutdown completed
15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1435298169147] All connections stopped
15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a closed
15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector: [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown completed in 40 ms
15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream Sink (3/4) failed
org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40)
at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24)
at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid stream header: 68617769
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222)
... 8 more

On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

Sorry, I ahve managed to link to a pom that has been changed in the meantime. But we have added a section to our doc clarifying your question. [1] Since then Stephan has proposed an even nicer solution that did not make it into the doc yet, namely if you start from our quickstart pom and add your dependencies to that simply executing 'mvn package -Pbuild-jar' you get a jar with all your the code that is needed to run it on the cluster, but not more. See [3] for more on the quickstart.


On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar <[hidden email]> wrote:
I use following dependencies and it works fine .

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9-SNAPSHOT</version>
</dependency>
<dependency>

On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <[hidden email]> wrote:
Hi  Marton

I have to add whole pom.xml file or just only plugin as below. 
I saw L286 to L296 are not correct information in pom.xml.
Thanks.



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <[hidden email]> wrote:
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course.
Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that.


On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
Dear Marton

What do you meaning for locally Eclipse with 'Run'.  
Do you want to me to run it on Namenode?  
But my namenode didn't install Kafka.  I only installed Kafka on my data node servers. 
Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. 
Please advise me.
Thanks. 


My Flink and Hadoop cluster info as below.

Flink on NameNode
Kafka,Zookeeper and FLink slave1 on Datanode1
Kafka,Zookeeper ,and Flink slave2 on Datanode2
Kafka, Zookeeper and Flink slave3 on Datanode3



On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
Dear Hawin,

No problem, I am gald that you are giving our Kafka connector a try. :)
The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. 

As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause.

On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:

Dear Marton

 

Thanks for supporting again.

I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster.

 

 

Here is my dependencies

******************************************************************************

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-connectors</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-core</artifactId>

            <version>0.9.0-milestone-1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.8.2.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-auth</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.6.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-core</artifactId>

            <version>1.2.1</version>

        </dependency>

    </dependencies>

 

*****************************************************************************************

 

 

 

 

Best regards

Email: [hidden email]

 

From: Márton Balassi [mailto:[hidden email]]
Sent: Thursday, June 11, 2015 12:58 AM
To: [hidden email]
Subject: Re: Kafka0.8.2.1 + Flink0.9.0 issue

 

Dear Hawin,

 

This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance?

 

Best,

 

Marton

 

On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <[hidden email]> wrote: