Hi there,
I have a flink streaming app where my source is Kafka and a custom sink to Cassandra(I can't use standard C* sink that comes with flink as I have customized auth to C*). I'm currently have the following: messageStream .map( s-> {return mapper.readValue(s, JsonNode.class);) .filter(//filter some messages) .map( (MapFunction<JsonNode, String>) message -> { getDbSession.execute("QUERY_TO_EXEC") }) private static Session getDbSession() { Thanks |
You haven't really added a sink in Flink terminology, you're just performing a side effect within a map operator. So while it may work, if you want to add a sink proper you need have an object that extends SinkFunction or RichSinkFunction. The method call
on the stream should be ".addSink(…)".
Also, the dbSession isn't really Flink state as it will not vary based on the position in or content in the stream. It's a necessary helper object, yes, but you don't need Flink to checkpoint it.
You can still use the sinks provided with flink-connector-cassandra and customize the cluster building by passing your own ClusterBuilder into the constructor.
-Shannon
From: Meghashyam Sandeep V <[hidden email]>
Date: Friday, December 9, 2016 at 12:26 PM To: <[hidden email]>, <[hidden email]> Subject: Reg. custom sinks in Flink Hi there,
I have a flink streaming app where my source is Kafka and a custom sink to Cassandra(I can't use standard C* sink that comes with flink as I have customized auth to C*). I'm currently have the following:
messageStream .map( s-> {return mapper.readValue(s, JsonNode.class);) .filter(//filter some messages) .map( (MapFunction<JsonNode, String>) message -> { getDbSession.execute("QUERY_TO_EXEC") }) private static Session getDbSession() { Thanks |
Thanks a lot for the quick reply Shannon. 1. I will create a class that extends SinkFunction and write my connection logic there. My only question here is- will a dbSession be created for each message/partition which might affect the performance? Thats the reason why I added this line to create a connection once and use it along the datastream. if(dbSession == null && store!=null) { dbSession = getSession();} 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my C* cluster and I couldn't get it work with all my SSL config(truststore,keystore etc) added to cluster building. I didn't find a proper example with SSL enabled flink-connector-cassandra Thanks On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <[hidden email]> wrote:
|
Hi Meghashyam,
Cheers, On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <[hidden email]> wrote:
|
Hi Till, Thanks for the information. 1. What do you mean by 'subtask', is it every partition or every message in the stream? 2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL as I can't use a query when I have a datastream with Pojo? CassandraSink.addSink(messageStream) Thanks, On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <[hidden email]> wrote:
|
Regarding 2) I don't think so. That
would require access to the datastax MappingManager.
We could add something similar as the ClusterBuilder for that though. Regards, Chesnay On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
|
Data piles up in Cassandra without TTL. Is there a workaround for this problem? Is there a way to specify my query and still use Pojo? Thanks, On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler <[hidden email]> wrote:
|
(1) A subtask is a parallel instance of an operator and thus responsible for a partition (possibly infinite) of the whole DataStream/DataSet. (2) Maybe you can add this feature to Flink's Cassandra Sink. Cheers, Till On Mon, Dec 12, 2016 at 4:30 PM, Meghashyam Sandeep V <[hidden email]> wrote:
|
Thank you Till. I wanted to contribute towards Flink. Looks like this could be a good start. I couldn't find the place where the insert query is built for Pojo sinks in CassandraSink.java, CassandraPojoSink.java, or CassandraSinkBase.java. Could you throw some light about how that insert query is built automatically by the sink? Thanks, On Mon, Dec 12, 2016 at 7:56 AM, Till Rohrmann <[hidden email]> wrote:
|
Hello,
the query is generated automatically from the pojo by the datastax MappingManager in the CassandraPojoSink; Flink isn't generating anything itself. On the MappingManager you can set the TTL for all queries (it also allows some other stuff). So, to allow the user to set the TTL we must add a hook to configure the MappingManager; this can be done the same way the Cluster is configured using the ClusterBuilder. Regards, Chesnay On 12.12.2016 19:12, Meghashyam Sandeep V wrote:
|
Free forum by Nabble | Edit this page |