Hello Team, I'm new to Flink and writing a Flink job that will take data from Kafka and sink it to InfluxDB. I tried using the concept this guy is using package com.dataartisans; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig; import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint; import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.json.JSONObject; import java.util.*; import java.util.concurrent.TimeUnit; public class ReadFromKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parameterTool = ParameterTool.fromArgs(args); DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); DataStream<InfluxDBPoint> formatStream = messageStream.rebalance().map(new MapFunction<String, InfluxDBPoint>() { private static final long serialVersionUID = -6867736771747690202L; @Override public InfluxDBPoint map(String value) throws Exception { JSONObject jsonObj = new JSONObject(value); HashMap<String, String> tags = new HashMap<>(); tags.put("source","kafka"); tags.put("sink","InfluxDB"); HashMap<String, Object> fields = new HashMap<>(); fields.put("first_name", jsonObj.getString("first_name")); fields.put("last_name", jsonObj.getString("last_name")); return new InfluxDBPoint("influxConnect", System.currentTimeMillis(),tags, fields); } }); InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test") .batchActions(1000) .flushDuration(100, TimeUnit.MILLISECONDS) .enableGzip(true) .build(); formatStream.addSink(new InfluxDBSink(influxDBConfig)); env.execute("InfluxDB Sink Example"); } } this is throwing error: 12:41:51,364 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to get topic metadata from broker localhost:9092 in try 0/3 12:41:51,876 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Topic flinkkafka has 1 partitions 12:41:51,928 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint must have a default constructor to be used as a POJO. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/util/Preconditions at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig.<init>(InfluxDBConfig.java:44) at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig$Builder.build(InfluxDBConfig.java:221) at com.dataartisans.ReadFromKafka.main(ReadFromKafka.java:67) Caused by: java.lang.ClassNotFoundException: org.apache.flink.util.Preconditions at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 3 more Can someone please help me to get out of this problem. Thanks,
|
Hi, This is most likely an exception that indicates either that 1) you are using mismatching versions of Flink in your application code and the installed Flink cluster, or 2) your application code isn't properly packaged. From your exception, I'm guessing it is the latter case. If so, I would suggest taking a look at the POM of the Flink quickstart project to get an idea of how to package your Flink applications properly [1]. Cheers, Gordon [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html On Tue, Nov 13, 2018 at 4:06 PM Abhijeet Kumar <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |