Writing on Cassandra

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing on Cassandra

AndreaKinn
Hi,
I'm trying to integrate a Cassandra sink in my project but honestly I'm a bit confused because I don't find any examples of use.

I want just to populate a table and query it on a single node instance of Cassandra.

The only one link I found is: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html

but I have problems also with imports:

import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

not recognised by eclipse.

I have added dependency in pom.xml file:

<dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-cassandra_2.10</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
</dependency>


Please, can you provide me some examples of use of Cassandra and clarify me why Cassandra classes are not recognised?

Thanks in advance,
Andrea
Reply | Threaded
Open this post in threaded view
|

Re: Writing on Cassandra

AndreaKinn
I probably solved import issue, but still need help to find some examples of use.
Please let me know if someone has experience with Flink and Cassandra together
Reply | Threaded
Open this post in threaded view
|

Re: Writing on Cassandra

Aljoscha Krettek
Hi,

In the doc section about Cassandra there is actually an example: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html.

In a Flink Job you would therefore roughly do this:

StreamExecutionEnvironment env = ...:

DataStream<> input = env.addSource(...);

CassandraSink.addSink(input)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
  .setClusterBuilder(new ClusterBuilder() {
    @Override
    public Cluster buildCluster(Cluster.Builder builder) {
      return builder.addContactPoint("127.0.0.1").build();
    }
  }).build();

env.execute();

Best,
Aljoscha

On 8. Aug 2017, at 21:08, AndreaKinn <[hidden email]> wrote:

I probably solved import issue, but still need help to find some examples of
use.
Please let me know if someone has experience with Flink and Cassandra
together



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744p14745.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Writing on Cassandra

AndreaKinn
Ok, this is my situation:

I have a stream of Tuple2<String, Tuple6<String, String, Date, String, String, Double>>

the cassandra code:

CassandraSink.addSink(stream)
                  .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
                + " (user, sensor, timestamp, json_ld, observed_value, value)"
                + " VALUES (?, ?, ?, ?, ?, ?);")
                  .setClusterBuilder(new ClusterBuilder() {
                    @Override
                    public Cluster buildCluster(Cluster.Builder builder) {
                      return builder.addContactPoint("127.0.0.1").build();
                    }
                  })
                  .build();

the values to insert in VALUES clause are exactly the values of Tuple6. How can I indicate that to my statement ?
Reply | Threaded
Open this post in threaded view
|

Re: Writing on Cassandra

Nico Kruber
If I see this correctly in the code, the CassandraSink is using the value of
its input stream automatically, so in your case
Tuple2<String, Tuple6<String, String, Date, String, String, Double>>

What you want is it to use only
Tuple6<String, String, Date, String, String, Double>
without the first first part of Tuple2 (probably your key?).

A simple map function before adding the sink should to the trick:

DataStream<Tuple2<String, Tuple6<String, String, Date, String, String,
Double>>> dataStream = //...
dataStream.map(new MapFunction<Tuple2<String, Tuple6<String, String, Date,
String, String, Double>>, Tuple6<String, String, Date, String, String,
Double>>() {
    @Override
    public Tuple6<String, String, Date, String, String, Double>
map(Tuple2<String, Tuple6<String, String, Date, String, String, Double>>
value) throws Exception {
        return value.f1;
    }
});


Nico

On Sunday, 13 August 2017 19:23:54 CEST AndreaKinn wrote:

> Ok, this is my situation:
>
> I have a stream of Tuple2<String, Tuple6&lt;String, String, Date, String,
> String, Double>>
>
> the cassandra code:
>
> CassandraSink.addSink(stream)
>  .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
>                 + " (user, sensor, timestamp, json_ld, observed_value,
> value)"
>                 + " VALUES (?, ?, ?, ?, ?, ?);")
>  .setClusterBuilder(new ClusterBuilder() {
>    @Override
>    public Cluster buildCluster(Cluster.Builder builder) {
>      return builder.addContactPoint("127.0.0.1").build();
>    }
>  })
>  .build();
>
> the values to insert in VALUES clause are exactly the values of Tuple6. How
> can I indicate that to my statement ?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing
> -on-Cassandra-tp14744p14867.html Sent from the Apache Flink User Mailing
> List archive. mailing list archive at Nabble.com.


signature.asc (201 bytes) Download Attachment