Hi,
I'm trying to integrate a Cassandra sink in my project but honestly I'm a bit confused because I don't find any examples of use. I want just to populate a table and query it on a single node instance of Cassandra. The only one link I found is: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html but I have problems also with imports: import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; not recognised by eclipse. I have added dependency in pom.xml file: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-cassandra_2.10</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> Please, can you provide me some examples of use of Cassandra and clarify me why Cassandra classes are not recognised? Thanks in advance, Andrea |
I probably solved import issue, but still need help to find some examples of use.
Please let me know if someone has experience with Flink and Cassandra together |
Hi,
In the doc section about Cassandra there is actually an example: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html. In a Flink Job you would therefore roughly do this: StreamExecutionEnvironment env = ...: DataStream<> input = env.addSource(...); .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") .setClusterBuilder(new ClusterBuilder() { @Override
public Cluster buildCluster(Cluster.Builder builder) { return builder.addContactPoint("127.0.0.1").build(); } }).build(); env.execute(); Best, Aljoscha On 8. Aug 2017, at 21:08, AndreaKinn <[hidden email]> wrote: |
Ok, this is my situation:
I have a stream of Tuple2<String, Tuple6<String, String, Date, String, String, Double>> the cassandra code: CassandraSink.addSink(stream) .setQuery("INSERT INTO keyspace_local.values_by_sensors_users" + " (user, sensor, timestamp, json_ld, observed_value, value)" + " VALUES (?, ?, ?, ?, ?, ?);") .setClusterBuilder(new ClusterBuilder() { @Override public Cluster buildCluster(Cluster.Builder builder) { return builder.addContactPoint("127.0.0.1").build(); } }) .build(); the values to insert in VALUES clause are exactly the values of Tuple6. How can I indicate that to my statement ? |
If I see this correctly in the code, the CassandraSink is using the value of
its input stream automatically, so in your case Tuple2<String, Tuple6<String, String, Date, String, String, Double>> What you want is it to use only Tuple6<String, String, Date, String, String, Double> without the first first part of Tuple2 (probably your key?). A simple map function before adding the sink should to the trick: DataStream<Tuple2<String, Tuple6<String, String, Date, String, String, Double>>> dataStream = //... dataStream.map(new MapFunction<Tuple2<String, Tuple6<String, String, Date, String, String, Double>>, Tuple6<String, String, Date, String, String, Double>>() { @Override public Tuple6<String, String, Date, String, String, Double> map(Tuple2<String, Tuple6<String, String, Date, String, String, Double>> value) throws Exception { return value.f1; } }); Nico On Sunday, 13 August 2017 19:23:54 CEST AndreaKinn wrote: > Ok, this is my situation: > > I have a stream of Tuple2<String, Tuple6<String, String, Date, String, > String, Double>> > > the cassandra code: > > CassandraSink.addSink(stream) > .setQuery("INSERT INTO keyspace_local.values_by_sensors_users" > + " (user, sensor, timestamp, json_ld, observed_value, > value)" > + " VALUES (?, ?, ?, ?, ?, ?);") > .setClusterBuilder(new ClusterBuilder() { > @Override > public Cluster buildCluster(Cluster.Builder builder) { > return builder.addContactPoint("127.0.0.1").build(); > } > }) > .build(); > > the values to insert in VALUES clause are exactly the values of Tuple6. How > can I indicate that to my statement ? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing > -on-Cassandra-tp14744p14867.html Sent from the Apache Flink User Mailing > List archive. mailing list archive at Nabble.com. signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |