Hi All,
I am trying to connect kafka with flink. But when i am running consumer code it show some Exception. Here i posted my pom.xml and simple java code for your reference. Please take a look at it. Thanks. This is my pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cts</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>1.0.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> <configuration> <archive> <manifest> <mainClass>com.cts.kafka.kafka </mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> I don't have any error in this file. Now I shared my java code.Here is my problem. package org.apache.flink.kafka_01; import java.util.Properties; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; public class Kafka_01 { public static void main( String[] args ) { StreamExecutionEnvironment flinkEnv =StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "group"); DataStreamSink<String> stream = flinkEnv .addSource(new KafkaSource<String>("localhost:2181", "topic", new SimpleStringSchema(),properties)) .print(); } } When i tried to running this code , it shows some exception. Here is my error, Exception in thread "main" java.lang.Error: Unresolved compilation problem: KafkaSource cannot be resolved to a type at org.apache.flink.kafka_01.Kafka_01.main(Kafka_01.java:25) Regards, Ramkumar L |
Free forum by Nabble | Edit this page |