DataStream, Sink and JDBC

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

DataStream, Sink and JDBC

toletum
Hi!
I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC).
I tried use a SinkFunction.... It works, but it create a connection each invoke method is called.


--------
DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"), new SimpleStringSchema(), properties));


messageStream.map(new StreamingCrimeSplitter
.filter(new filterFunction())
.keyBy(1);
.addSink(new sinkFunction());



--------
--------
public class sinkFunction
implements SinkFunction<Tuple7<String, String, String, String, String, String,String>> {

        private static final long serialVersionUID = 2859601213304525959L;
        @Override
        public void invoke(Tuple7<String, String, String, String, String, String, String> crime) throws Exception {
                System.out.println(crime.f0);
//JDBC connection
        }
}

--------


Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable.


Thanks
Toletum


Reply | Threaded
Open this post in threaded view
|

Re: DataStream, Sink and JDBC

Chiwan Park-2
Hi Toletum,

You can initialize a JDBC connection with RichSinkFunction [1]. There are two methods, `open` and `close`. The `open` method is called once before calling `invoke` method. The `close` method is called lastly.

Note that you should add `transient` keyword to the JDBC connection object.

Regards,
Chiwan Park

[1]: https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html

> On Mar 7, 2016, at 10:08 PM, [hidden email] wrote:
>
> Hi!
> I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC).
> I tried use a SinkFunction.... It works, but it create a connection each invoke method is called.
>
>
> --------
> DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"), new SimpleStringSchema(), properties));
>
>
> messageStream.map(new StreamingCrimeSplitter
> .filter(new filterFunction())
> .keyBy(1);
> .addSink(new sinkFunction());
>
>
> --------
> --------
> public class sinkFunction
> implements SinkFunction<Tuple7<String, String, String, String, String, String,String>> {
>         private static final long serialVersionUID = 2859601213304525959L;
>         @Override
>         public void invoke(Tuple7<String, String, String, String, String, String, String> crime) throws Exception {
>                 System.out.println(crime.f0);
> //JDBC connection
>         }
> }
> --------
>
>
> Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable.
>
>
> Thanks
> Toletum
>
>

Reply | Threaded
Open this post in threaded view
|

Re: DataStream, Sink and JDBC

Chesnay Schepler
In reply to this post by toletum
you'll have to change your sinkfunction to extend RichSinkFunction, and then create your JDBC connection within the open method.

On 07.03.2016 14:08, [hidden email] wrote:
Hi!
I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC).
I tried use a SinkFunction.... It works, but it create a connection each invoke method is called.


--------
DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"), new SimpleStringSchema(), properties));


messageStream.map(new StreamingCrimeSplitter
.filter(new filterFunction())
.keyBy(1);
.addSink(new sinkFunction());



--------
--------
public class sinkFunction
implements SinkFunction<Tuple7<String, String, String, String, String, String,String>> {

        private static final long serialVersionUID = 2859601213304525959L;
        @Override
        public void invoke(Tuple7<String, String, String, String, String, String, String> crime) throws Exception {
                System.out.println(crime.f0);
//JDBC connection
        }
}

--------


Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable.


Thanks
Toletum



Reply | Threaded
Open this post in threaded view
|

Re[2]: DataStream, Sink and JDBC

toletum
In reply to this post by Chiwan Park-2
Thanks Chiwan and Chesnay


I'm happy :-)




On lun., mar. 7, 2016 at 14:18, Chiwan Park <[hidden email]> wrote:
Hi Toletum,

You can initialize a JDBC connection with RichSinkFunction [1]. There are two methods, `open` and `close`. The `open` method is called once before calling `invoke` method. The `close` method is called lastly.

Note that you should add `transient` keyword to the JDBC connection object.

Regards,
Chiwan Park

[1]: https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html

On Mar 7, 2016, at 10:08 PM, [hidden email] wrote:

Hi!
I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC).
I tried use a SinkFunction.... It works, but it create a connection each invoke method is called.

--------
DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"), new SimpleStringSchema(), properties));

messageStream.map(new StreamingCrimeSplitter
.filter(new filterFunction())
.keyBy(1);
.addSink(new sinkFunction());

--------
--------
public class sinkFunction
implements SinkFunction<Tuple7<String, String, String, String, String, String,String>> {
private static final long serialVersionUID = 2859601213304525959L;
@Override
public void invoke(Tuple7<String, String, String, String, String, String, String> crime) throws Exception {
System.out.println(crime.f0);
//JDBC connection
}
}
--------

Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable.

Thanks
Toletum