ConnectionPool to DB and parallelism of operator question

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

ConnectionPool to DB and parallelism of operator question

Vijay Balakrishnan
HI,
Basic question on parallelism of operators and ConnectionPool to DB:
Will this result in 82 * 300 connections to InfluxDB or just 300 connections to InfluxDB ?
main() {
  sink = ....createInfluxMonitoringSink(..);
  keyStream.addSink(sink).addParallelism(82);//will this result in 82 * 300 connections to InfluxDB or just 300 connections to InfluxDB ?
}


private ..... createInfluxMonitoringSink(...) {


  final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
               .readTimeout(timeout, TimeUnit.MILLISECONDS)
               .connectTimeout(timeout, TimeUnit.MILLISECONDS)
               .writeTimeout(timeout, TimeUnit.MILLISECONDS)
               .connectionPool(new ConnectionPool(300, 60, TimeUnit.SECONDS));
       
       try (InfluxDB influxDB = InfluxDBFactory.connect
               (host, userName, pwd, okHttpClientBuilder)) { ......}

}

TIA,
Reply | Threaded
Open this post in threaded view
|

Re: ConnectionPool to DB and parallelism of operator question

Aljoscha Krettek
Hi,

since I don't know the implementation of the Sink I can only guess. I
would say you get 82 * 300 connections because you will get 82 instances
of a sink operator and each of those would then have a connection pool
of 300 connections. The individual sink instances will (potentially) run
on different machines and not share the connection pool.

Best,
Aljoscha

On 05.10.20 22:28, Vijay Balakrishnan wrote:

> HI,
> Basic question on parallelism of operators and ConnectionPool to DB:
> Will this result in 82 * 300 connections to InfluxDB or just 300
> connections to InfluxDB ?
> main() {
>    sink = ....createInfluxMonitoringSink(..);
>    keyStream.addSink(sink).addParallelism(82);//will this result in 82 * 300
> connections to InfluxDB or just 300 connections to InfluxDB ?
> }
>
>
> private ..... createInfluxMonitoringSink(...) {
>
>
>    final OkHttpClient.Builder okHttpClientBuilder = new
> OkHttpClient.Builder()
>                 .readTimeout(timeout, TimeUnit.MILLISECONDS)
>                 .connectTimeout(timeout, TimeUnit.MILLISECONDS)
>                 .writeTimeout(timeout, TimeUnit.MILLISECONDS)
>                 .connectionPool(new ConnectionPool(300, 60,
> TimeUnit.SECONDS));
>
>         try (InfluxDB influxDB = InfluxDBFactory.connect
>                 (host, userName, pwd, okHttpClientBuilder)) { ......}
>
> }
>
> TIA,
>

Reply | Threaded
Open this post in threaded view
|

Re: ConnectionPool to DB and parallelism of operator question

Arvid Heise-3
Hi Vijay,

If you implement the SinkFunction yourself, you can share the OkHttpClient.Builder across all instances in the same taskmanager by using a static field and initializing it only once (ideally in RichSinkFunction#open).

On Tue, Oct 6, 2020 at 9:37 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

since I don't know the implementation of the Sink I can only guess. I
would say you get 82 * 300 connections because you will get 82 instances
of a sink operator and each of those would then have a connection pool
of 300 connections. The individual sink instances will (potentially) run
on different machines and not share the connection pool.

Best,
Aljoscha

On 05.10.20 22:28, Vijay Balakrishnan wrote:
> HI,
> Basic question on parallelism of operators and ConnectionPool to DB:
> Will this result in 82 * 300 connections to InfluxDB or just 300
> connections to InfluxDB ?
> main() {
>    sink = ....createInfluxMonitoringSink(..);
>    keyStream.addSink(sink).addParallelism(82);//will this result in 82 * 300
> connections to InfluxDB or just 300 connections to InfluxDB ?
> }
>
>
> private ..... createInfluxMonitoringSink(...) {
>
>
>    final OkHttpClient.Builder okHttpClientBuilder = new
> OkHttpClient.Builder()
>                 .readTimeout(timeout, TimeUnit.MILLISECONDS)
>                 .connectTimeout(timeout, TimeUnit.MILLISECONDS)
>                 .writeTimeout(timeout, TimeUnit.MILLISECONDS)
>                 .connectionPool(new ConnectionPool(300, 60,
> TimeUnit.SECONDS));
>
>         try (InfluxDB influxDB = InfluxDBFactory.connect
>                 (host, userName, pwd, okHttpClientBuilder)) { ......}
>
> }
>
> TIA,
>



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng