Kafka + Flink issue

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Kafka + Flink issue

Ramkumar
Hi All,

I am trying to connect kafka with flink. But when i am running consumer code it show some Exception.

Here i posted my pom.xml and simple java code for your reference. Please take a look at it.

Thanks.

This is my pom.xml




<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.cts</groupId>
  <artifactId>kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafka</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
   
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>1.0.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
      <version>1.0.0</version>
    </dependency>
</dependencies>
 
  <build>
              <plugins>

                     <plugin>
                           <groupId>org.apache.maven.plugins</groupId>
                           <artifactId>maven-assembly-plugin</artifactId>
                           <executions>
                                  <execution>
                                         <phase>package</phase>
                                         <goals>
                                                <goal>single</goal>
                                         </goals>
                                         <configuration>
                                                <archive>
                                                       <manifest>
                                                              <mainClass>com.cts.kafka.kafka </mainClass>
                                                       </manifest>
                                                </archive>
                                                <descriptorRefs>
                                                       <descriptorRef>jar-with-dependencies</descriptorRef>
                                                </descriptorRefs>
                                         </configuration>
                                  </execution>
                           </executions>
                     </plugin>
              </plugins>
       </build>
 
</project>


I don't have any error in this file.



Now I shared my java code.Here is my problem.



package org.apache.flink.kafka_01;

import java.util.Properties;

import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class Kafka_01
{
    public static void main( String[] args )
    {
    StreamExecutionEnvironment flinkEnv =StreamExecutionEnvironment.getExecutionEnvironment();

               Properties properties = new Properties();
               properties.setProperty("bootstrap.servers", "localhost:9092");
               properties.setProperty("zookeeper.connect", "localhost:2181");
               properties.setProperty("group.id", "group");
               DataStreamSink<String> stream = flinkEnv
             .addSource(new KafkaSource<String>("localhost:2181", "topic", new SimpleStringSchema(),properties))
                      .print();
    }
}


When i tried to running this code , it shows some exception.

Here is my error,


Exception in thread "main" java.lang.Error: Unresolved compilation problem:
        KafkaSource cannot be resolved to a type

        at org.apache.flink.kafka_01.Kafka_01.main(Kafka_01.java:25)





Regards,
Ramkumar L