Backpressure in the context of JDBCOutputFormat update

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Backpressure in the context of JDBCOutputFormat update

Maximilian Bode
Hi everyone,

in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a database update) is performing slower than the other one (an insert). The job as a whole is also slow as upstream operators are slowed down due to backpressure. I am able to speed up the whole job by introducing an a priori unnecessary .distinct(), which of course blocks downstream execution of the slow sink, which in turn seems to be able to execute faster when given all data at once.

Any ideas what is going on here? Is there something I can do without introducing unnecessary computation steps?

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * [hidden email] * 0176 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure in the context of JDBCOutputFormat update

rmetzger0
Hi Max,

is the distinct() operation reducing the size of the DataSet? If so, I assume you have an idempotent update and the job is faster because fewer updates are done?
if the distinct() operator is not changing anything, then, the job might be faster because the INSERT is done while Flink is still executing the distinct() operation. So the insert is over when the updates are starting. This would mean that concurrent inserts and updates on the database are much slower than doing this sequentially.

I'm wondering if there is a way in Flink to explicitly ask for spilling an intermediate operator to "pause" execution:

Source ----- > (spill for pausing) ---> (update sink)
        \
         ------- > (insert)

I don't have a lot of practical experience with RDBMS, but I guess updates are slower because an index lookup + update is necessary. Maybe optimizing the database configuration / schema / indexes is more promising. I think its indeed much nicer to avoid any unnecessary steps in Flink.

Did you do any "microbenchmarks" for the update and insert part? I guess that would help a lot to understand the impact of certain index structures, batching sizes, or database drivers.

Regards,
Robert




On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a database update) is performing slower than the other one (an insert). The job as a whole is also slow as upstream operators are slowed down due to backpressure. I am able to speed up the whole job by introducing an a priori unnecessary .distinct(), which of course blocks downstream execution of the slow sink, which in turn seems to be able to execute faster when given all data at once.

Any ideas what is going on here? Is there something I can do without introducing unnecessary computation steps?

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * [hidden email] * <a href="tel:0176%201000%2075%2050" value="+4917610007550" target="_blank">0176 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Reply | Threaded
Open this post in threaded view
|

Re: Backpressure in the context of JDBCOutputFormat update

Maximilian Bode
Hi Robert,
sorry, I should have been clearer in my initial mail. The two cases I was comparing are:

1) distinct() before Insert (which is necessary as we have a unique key constraint in our database), no distinct() before update
2) distinct() before insert AND distinct() before update

The test data used actually only contains unique values for the affected field though, so the dataset size is not reduced in case 2.

In case 1 the insert does not start until all the data has arrived at distinct() while the update is already going along (slowing down upstream operators as well). In case 2 both sinks wait for their respective distinct()'s (which is reached much faster now), then start roughly at the same time leading to a shorter net job time for job 2 as compared to 1.

A pause operator might be useful, yes.

The update should not be an inherently much more expensive operation, as the WHERE clause only contains the table's primary key.

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * [hidden email] * 0176 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Am 21.01.2016 um 15:57 schrieb Robert Metzger <[hidden email]>:

Hi Max,

is the distinct() operation reducing the size of the DataSet? If so, I assume you have an idempotent update and the job is faster because fewer updates are done?
if the distinct() operator is not changing anything, then, the job might be faster because the INSERT is done while Flink is still executing the distinct() operation. So the insert is over when the updates are starting. This would mean that concurrent inserts and updates on the database are much slower than doing this sequentially.

I'm wondering if there is a way in Flink to explicitly ask for spilling an intermediate operator to "pause" execution:

Source ----- > (spill for pausing) ---> (update sink)
        \
         ------- > (insert)

I don't have a lot of practical experience with RDBMS, but I guess updates are slower because an index lookup + update is necessary. Maybe optimizing the database configuration / schema / indexes is more promising. I think its indeed much nicer to avoid any unnecessary steps in Flink.

Did you do any "microbenchmarks" for the update and insert part? I guess that would help a lot to understand the impact of certain index structures, batching sizes, or database drivers.

Regards,
Robert




On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a database update) is performing slower than the other one (an insert). The job as a whole is also slow as upstream operators are slowed down due to backpressure. I am able to speed up the whole job by introducing an a priori unnecessary .distinct(), which of course blocks downstream execution of the slow sink, which in turn seems to be able to execute faster when given all data at once.

Any ideas what is going on here? Is there something I can do without introducing unnecessary computation steps?

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * [hidden email] * <a href="tel:0176%201000%2075%2050" value="+4917610007550" target="_blank" class="">0176 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



Reply | Threaded
Open this post in threaded view
|

Re: Backpressure in the context of JDBCOutputFormat update

rmetzger0
Hi,

have you thought about making two independent jobs out of this? (or you call execute() for the two separate parts)
One job for the update() and one for the insert() ?

Even though the update operation should not be expensive, I think its helpful to understand the performance impact of having concurrent insert / updates vs executing these operations sequentially ?
Are the inserts / updates performed on the same table?





On Thu, Jan 21, 2016 at 4:17 PM, Maximilian Bode <[hidden email]> wrote:
Hi Robert,
sorry, I should have been clearer in my initial mail. The two cases I was comparing are:

1) distinct() before Insert (which is necessary as we have a unique key constraint in our database), no distinct() before update
2) distinct() before insert AND distinct() before update

The test data used actually only contains unique values for the affected field though, so the dataset size is not reduced in case 2.

In case 1 the insert does not start until all the data has arrived at distinct() while the update is already going along (slowing down upstream operators as well). In case 2 both sinks wait for their respective distinct()'s (which is reached much faster now), then start roughly at the same time leading to a shorter net job time for job 2 as compared to 1.

A pause operator might be useful, yes.

The update should not be an inherently much more expensive operation, as the WHERE clause only contains the table's primary key.

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * [hidden email] * <a href="tel:0176%201000%2075%2050" value="+4917610007550" target="_blank">0176 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Am 21.01.2016 um 15:57 schrieb Robert Metzger <[hidden email]>:

Hi Max,

is the distinct() operation reducing the size of the DataSet? If so, I assume you have an idempotent update and the job is faster because fewer updates are done?
if the distinct() operator is not changing anything, then, the job might be faster because the INSERT is done while Flink is still executing the distinct() operation. So the insert is over when the updates are starting. This would mean that concurrent inserts and updates on the database are much slower than doing this sequentially.

I'm wondering if there is a way in Flink to explicitly ask for spilling an intermediate operator to "pause" execution:

Source ----- > (spill for pausing) ---> (update sink)
        \
         ------- > (insert)

I don't have a lot of practical experience with RDBMS, but I guess updates are slower because an index lookup + update is necessary. Maybe optimizing the database configuration / schema / indexes is more promising. I think its indeed much nicer to avoid any unnecessary steps in Flink.

Did you do any "microbenchmarks" for the update and insert part? I guess that would help a lot to understand the impact of certain index structures, batching sizes, or database drivers.

Regards,
Robert




On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a database update) is performing slower than the other one (an insert). The job as a whole is also slow as upstream operators are slowed down due to backpressure. I am able to speed up the whole job by introducing an a priori unnecessary .distinct(), which of course blocks downstream execution of the slow sink, which in turn seems to be able to execute faster when given all data at once.

Any ideas what is going on here? Is there something I can do without introducing unnecessary computation steps?

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * [hidden email] * <a href="tel:0176%201000%2075%2050" value="+4917610007550" target="_blank">0176 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082