Confusions About JDBCOutputFormat

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Confusions About JDBCOutputFormat

wangsan
Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connction lies idle for a long time, the database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan
Reply | Threaded
Open this post in threaded view
|

Re: Confusions About JDBCOutputFormat

Hequn Cheng
Hi wangsan,

I agree with you. It would be kind of you to open a jira to check the problem.

For the first problem, I think we need to establish connection each time execute batch write. And, it is better to get the connection from a connection pool.
For the second problem, to avoid multithread problem, I think we should synchronized the batch object in flush() method.

What do you think?

Best, Hequn



On Tue, Jul 10, 2018 at 2:36 PM, wangsan <[hidden email]> wrote:
Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connction lies idle for a long time, the database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan

Reply | Threaded
Open this post in threaded view
|

Re: Confusions About JDBCOutputFormat

wangsan
Hi Hequn,

Establishing a connection for each batch write may also have idle connection problem, since we are not sure when the connection will be closed. We call flush() method when a batch is finished or  snapshot state, but what if the snapshot is not enabled and the batch size not reached before the connection is closed?

May be we could use a Timer to test the connection periodically and keep it alive. What do you think?

I will open a jira and try to work on that issue.

Best, 
wangsan



On Jul 10, 2018, at 8:38 PM, Hequn Cheng <[hidden email]> wrote:

Hi wangsan,

I agree with you. It would be kind of you to open a jira to check the problem.

For the first problem, I think we need to establish connection each time execute batch write. And, it is better to get the connection from a connection pool.
For the second problem, to avoid multithread problem, I think we should synchronized the batch object in flush() method.

What do you think?

Best, Hequn



On Tue, Jul 10, 2018 at 2:36 PM, wangsan <[hidden email]> wrote:
Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connction lies idle for a long time, the database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan


Reply | Threaded
Open this post in threaded view
|

Re: Confusions About JDBCOutputFormat

Hequn Cheng
Hi wangsan, 

What I mean is establishing a connection each time write data into JDBC, i.e.  establish a connection in flush() function. I think this will make sure the connection is ok. What do you think? 

On Wed, Jul 11, 2018 at 12:12 AM, wangsan <[hidden email]> wrote:
Hi Hequn,

Establishing a connection for each batch write may also have idle connection problem, since we are not sure when the connection will be closed. We call flush() method when a batch is finished or  snapshot state, but what if the snapshot is not enabled and the batch size not reached before the connection is closed?

May be we could use a Timer to test the connection periodically and keep it alive. What do you think?

I will open a jira and try to work on that issue.

Best, 
wangsan



On Jul 10, 2018, at 8:38 PM, Hequn Cheng <[hidden email]> wrote:

Hi wangsan,

I agree with you. It would be kind of you to open a jira to check the problem.

For the first problem, I think we need to establish connection each time execute batch write. And, it is better to get the connection from a connection pool.
For the second problem, to avoid multithread problem, I think we should synchronized the batch object in flush() method.

What do you think?

Best, Hequn



On Tue, Jul 10, 2018 at 2:36 PM, wangsan <[hidden email]> wrote:
Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connction lies idle for a long time, the database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan



Reply | Threaded
Open this post in threaded view
|

Re: Confusions About JDBCOutputFormat

wangsan
Well, I see. If the connection is established when writing data into DB, we need to cache received rows since last write. 

IMO, maybe we do not need to open connections repeatedly or introduce connection pools. Test and refresh the connection periodically can simply solve this problem. I’ve implemented this at https://github.com/apache/flink/pull/6301, It would be kind of you to review this.

Best,
wangsan


On Jul 11, 2018, at 2:25 PM, Hequn Cheng <[hidden email]> wrote:

Hi wangsan,

What I mean is establishing a connection each time write data into JDBC,
i.e.  establish a connection in flush() function. I think this will make
sure the connection is ok. What do you think?

On Wed, Jul 11, 2018 at 12:12 AM, wangsan <[hidden email]> wrote:

Hi Hequn,

Establishing a connection for each batch write may also have idle
connection problem, since we are not sure when the connection will be
closed. We call flush() method when a batch is finished or  snapshot state,
but what if the snapshot is not enabled and the batch size not reached
before the connection is closed?

May be we could use a Timer to test the connection periodically and keep
it alive. What do you think?

I will open a jira and try to work on that issue.

Best,
wangsan



On Jul 10, 2018, at 8:38 PM, Hequn Cheng <[hidden email]> wrote:

Hi wangsan,

I agree with you. It would be kind of you to open a jira to check the
problem.

For the first problem, I think we need to establish connection each time
execute batch write. And, it is better to get the connection from a
connection pool.
For the second problem, to avoid multithread problem, I think we should
synchronized the batch object in flush() method.

What do you think?

Best, Hequn



On Tue, Jul 10, 2018 at 2:36 PM, wangsan <[hidden email]> wrote:

Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink
application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and
will be used all the time. But if this connction lies idle for a long time,
the database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold,
but it is also called while snapshotting state. So two threads may modify
upload and batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan

Reply | Threaded
Open this post in threaded view
|

Re: Confusions About JDBCOutputFormat

Hequn Cheng
Cool. I will take a look. Thanks.

On Wed, Jul 11, 2018 at 7:08 PM, wangsan <[hidden email]> wrote:
Well, I see. If the connection is established when writing data into DB, we need to cache received rows since last write. 

IMO, maybe we do not need to open connections repeatedly or introduce connection pools. Test and refresh the connection periodically can simply solve this problem. I’ve implemented this at https://github.com/apache/flink/pull/6301, It would be kind of you to review this.

Best,
wangsan



On Jul 11, 2018, at 2:25 PM, Hequn Cheng <[hidden email]> wrote:

Hi wangsan,

What I mean is establishing a connection each time write data into JDBC,
i.e.  establish a connection in flush() function. I think this will make
sure the connection is ok. What do you think?

On Wed, Jul 11, 2018 at 12:12 AM, wangsan <[hidden email]> wrote:

Hi Hequn,

Establishing a connection for each batch write may also have idle
connection problem, since we are not sure when the connection will be
closed. We call flush() method when a batch is finished or  snapshot state,
but what if the snapshot is not enabled and the batch size not reached
before the connection is closed?

May be we could use a Timer to test the connection periodically and keep
it alive. What do you think?

I will open a jira and try to work on that issue.

Best,
wangsan



On Jul 10, 2018, at 8:38 PM, Hequn Cheng <[hidden email]> wrote:

Hi wangsan,

I agree with you. It would be kind of you to open a jira to check the
problem.

For the first problem, I think we need to establish connection each time
execute batch write. And, it is better to get the connection from a
connection pool.
For the second problem, to avoid multithread problem, I think we should
synchronized the batch object in flush() method.

What do you think?

Best, Hequn



On Tue, Jul 10, 2018 at 2:36 PM, wangsan <[hidden email]> wrote:

Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink
application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and
will be used all the time. But if this connction lies idle for a long time,
the database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold,
but it is also called while snapshotting state. So two threads may modify
upload and batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan