Hi!
I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC). I tried use a SinkFunction.... It works, but it create a connection each invoke method is called. -------- DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"), new SimpleStringSchema(), properties)); messageStream.map(new StreamingCrimeSplitter .filter(new filterFunction()) .keyBy(1); .addSink(new sinkFunction()); -------- -------- public class sinkFunction implements SinkFunction<Tuple7<String, String, String, String, String, String,String>> { private static final long serialVersionUID = 2859601213304525959L; @Override public void invoke(Tuple7<String, String, String, String, String, String, String> crime) throws Exception { System.out.println(crime.f0); //JDBC connection } } -------- Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable. Thanks Toletum |
Hi Toletum,
You can initialize a JDBC connection with RichSinkFunction [1]. There are two methods, `open` and `close`. The `open` method is called once before calling `invoke` method. The `close` method is called lastly. Note that you should add `transient` keyword to the JDBC connection object. Regards, Chiwan Park [1]: https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html > On Mar 7, 2016, at 10:08 PM, [hidden email] wrote: > > Hi! > I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC). > I tried use a SinkFunction.... It works, but it create a connection each invoke method is called. > > > -------- > DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"), new SimpleStringSchema(), properties)); > > > messageStream.map(new StreamingCrimeSplitter > .filter(new filterFunction()) > .keyBy(1); > .addSink(new sinkFunction()); > > > -------- > -------- > public class sinkFunction > implements SinkFunction<Tuple7<String, String, String, String, String, String,String>> { > private static final long serialVersionUID = 2859601213304525959L; > @Override > public void invoke(Tuple7<String, String, String, String, String, String, String> crime) throws Exception { > System.out.println(crime.f0); > //JDBC connection > } > } > -------- > > > Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable. > > > Thanks > Toletum > > |
In reply to this post by toletum
you'll have to change your sinkfunction
to extend RichSinkFunction, and then create your JDBC connection
within the open method.
On 07.03.2016 14:08, [hidden email] wrote:
|
In reply to this post by Chiwan Park-2
Thanks Chiwan and Chesnay
I'm happy :-) On lun., mar. 7, 2016 at 14:18, Chiwan Park <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |