Flink Streaming sink to InfluxDB

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Streaming sink to InfluxDB

Abhijeet Kumar
Hello Team,

I'm new to Flink and writing a Flink job that will take data from Kafka and sink it to InfluxDB. I tried using the concept this guy is using
 

package com.dataartisans;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.json.JSONObject;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class ReadFromKafka {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
DataStream<InfluxDBPoint> formatStream = messageStream.rebalance().map(new MapFunction<String, InfluxDBPoint>() {
private static final long serialVersionUID = -6867736771747690202L;

@Override
public InfluxDBPoint map(String value) throws Exception {
JSONObject jsonObj = new JSONObject(value);
HashMap<String, String> tags = new HashMap<>();
tags.put("source","kafka");
tags.put("sink","InfluxDB");
HashMap<String, Object> fields = new HashMap<>();
                fields.put("first_name", jsonObj.getString("first_name"));
                fields.put("last_name", jsonObj.getString("last_name"));
                
return new InfluxDBPoint("influxConnect", System.currentTimeMillis(),tags, fields);
}
});
InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test")
               .batchActions(1000)
               .flushDuration(100, TimeUnit.MILLISECONDS)
               .enableGzip(true)
               .build();
formatStream.addSink(new InfluxDBSink(influxDBConfig));

env.execute("InfluxDB Sink Example");
}
}

this is throwing error:

12:41:51,364 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying to get topic metadata from broker localhost:9092 in try 0/3
12:41:51,876 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Topic flinkkafka has 1 partitions
12:41:51,928 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint must have a default constructor to be used as a POJO.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/util/Preconditions
at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig.<init>(InfluxDBConfig.java:44)
at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig$Builder.build(InfluxDBConfig.java:221)
at com.dataartisans.ReadFromKafka.main(ReadFromKafka.java:67)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.util.Preconditions
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 3 more

Can someone please help me to get out of this problem.

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !


Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming sink to InfluxDB

Tzu-Li (Gordon) Tai
Hi,

This is most likely an exception that indicates either that 1) you are using mismatching versions of Flink in your application code and the installed Flink cluster, or 2) your application code isn't properly packaged.
From your exception, I'm guessing it is the latter case. If so, I would suggest taking a look at the POM of the Flink quickstart project to get an idea of how to package your Flink applications properly [1].

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html  

On Tue, Nov 13, 2018 at 4:06 PM Abhijeet Kumar <[hidden email]> wrote:
Hello Team,

I'm new to Flink and writing a Flink job that will take data from Kafka and sink it to InfluxDB. I tried using the concept this guy is using
 

package com.dataartisans;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.json.JSONObject;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class ReadFromKafka {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
DataStream<InfluxDBPoint> formatStream = messageStream.rebalance().map(new MapFunction<String, InfluxDBPoint>() {
private static final long serialVersionUID = -6867736771747690202L;

@Override
public InfluxDBPoint map(String value) throws Exception {
JSONObject jsonObj = new JSONObject(value);
HashMap<String, String> tags = new HashMap<>();
tags.put("source","kafka");
tags.put("sink","InfluxDB");
HashMap<String, Object> fields = new HashMap<>();
                fields.put("first_name", jsonObj.getString("first_name"));
                fields.put("last_name", jsonObj.getString("last_name"));
                
return new InfluxDBPoint("influxConnect", System.currentTimeMillis(),tags, fields);
}
});
InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test")
               .batchActions(1000)
               .flushDuration(100, TimeUnit.MILLISECONDS)
               .enableGzip(true)
               .build();
formatStream.addSink(new InfluxDBSink(influxDBConfig));

env.execute("InfluxDB Sink Example");
}
}

this is throwing error:

12:41:51,364 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying to get topic metadata from broker localhost:9092 in try 0/3
12:41:51,876 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Topic flinkkafka has 1 partitions
12:41:51,928 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint must have a default constructor to be used as a POJO.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/util/Preconditions
at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig.<init>(InfluxDBConfig.java:44)
at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig$Builder.build(InfluxDBConfig.java:221)
at com.dataartisans.ReadFromKafka.main(ReadFromKafka.java:67)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.util.Preconditions
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 3 more

Can someone please help me to get out of this problem.

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !