Cassandra Connector

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Cassandra Connector

kaelumania
Hello,

I wondered why the cassandra connector has such an unusual interface:
CassandraSink<Reading> csink = CassandraSink.addSink(readings)
while all other sinks seem to look like

RMQSink<Reading> sink = new RMQSink<Reading>(cfg, "readings_persist_out", new JSONReadingSchema());
readings.addSink(sink);
best,
Stephan



Reply | Threaded
Open this post in threaded view
|

Re: Cassandra Connector

Chesnay Schepler
Hello,

the CassandraSink is not implemented as a sink but as a special operator, so you wouldn't be able to use the
addSink() method. (I can't remember the actual method being used.)

There are also several different implementations for various types (tuples, pojo's, scala case classes) but we
did not want the user to be aware of it. This has the neat property that we can change the underlying classes
any way we want (like modifying the constructor) without breaking anything.

Regards,
Chesnay

On 22.11.2016 08:06, Stephan Epping wrote:
Hello,

I wondered why the cassandra connector has such an unusual interface:
CassandraSink<Reading> csink = CassandraSink.addSink(readings)
while all other sinks seem to look like

RMQSink<Reading> sink = new RMQSink<Reading>(cfg, "readings_persist_out", new JSONReadingSchema());
readings.addSink(sink);
best,
Stephan




Reply | Threaded
Open this post in threaded view
|

Re: Cassandra Connector

Chesnay Schepler
Actually this is a bit inaccurate. _Some_ implementations are not implemented as a sink.

Also, you can in fact instantiate the sinks yourself as well, as in
readings.addSink(new CassandraTupleSink(<query>, <builder>);

On 22.11.2016 09:30, Chesnay Schepler wrote:
Hello,

the CassandraSink is not implemented as a sink but as a special operator, so you wouldn't be able to use the
addSink() method. (I can't remember the actual method being used.)

There are also several different implementations for various types (tuples, pojo's, scala case classes) but we
did not want the user to be aware of it. This has the neat property that we can change the underlying classes
any way we want (like modifying the constructor) without breaking anything.

Regards,
Chesnay

On 22.11.2016 08:06, Stephan Epping wrote:
Hello,

I wondered why the cassandra connector has such an unusual interface:
CassandraSink<Reading> csink = CassandraSink.addSink(readings)
while all other sinks seem to look like

RMQSink<Reading> sink = new RMQSink<Reading>(cfg, "readings_persist_out", new JSONReadingSchema());
readings.addSink(sink);
best,
Stephan





Reply | Threaded
Open this post in threaded view
|

Re: Cassandra Connector

kaelumania
Hey Chesnay,

that looks good. I like to use the same mechanism for all my sinks. Thus,

this 
readings.addSink(new CassandraTupleSink(<query>, <builder>);

will be my desired way.

best, Stephan


On 22 Nov 2016, at 09:33, Chesnay Schepler <[hidden email]> wrote:

Actually this is a bit inaccurate. _Some_ implementations are not implemented as a sink.

Also, you can in fact instantiate the sinks yourself as well, as in
readings.addSink(new CassandraTupleSink(<query>, <builder>);

On 22.11.2016 09:30, Chesnay Schepler wrote:
Hello,

the CassandraSink is not implemented as a sink but as a special operator, so you wouldn't be able to use the
addSink() method. (I can't remember the actual method being used.)

There are also several different implementations for various types (tuples, pojo's, scala case classes) but we
did not want the user to be aware of it. This has the neat property that we can change the underlying classes
any way we want (like modifying the constructor) without breaking anything.

Regards,
Chesnay

On 22.11.2016 08:06, Stephan Epping wrote:
Hello,

I wondered why the cassandra connector has such an unusual interface:
CassandraSink<Reading> csink = CassandraSink.addSink(readings)
while all other sinks seem to look like

RMQSink<Reading> sink = new RMQSink<Reading>(cfg, "readings_persist_out", new JSONReadingSchema());
readings.addSink(sink);
best,
Stephan