Async Function Not Generating Backpressure

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Async Function Not Generating Backpressure

Seed Zeng
Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed
Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Andrey Zagrebin-3
AsyncFunctionHi Seed,

I think the back pressure should emerge by blocking the AsyncFunction.asyncInvoke call.
So it depends on how ResultFuture is generated from Cassandra client whether it blocks on submitting request or not when the number of pending requests is too big.
Maybe, AsyncFunction.asyncInvoke needs addition throttling on submitting requests to Cassandra.
See also [1] for Cassandra sink with throttling.

Best,
Andrey



On Tue, Mar 19, 2019 at 3:48 AM Seed Zeng <[hidden email]> wrote:
Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed
Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Ken Krugler
In reply to this post by Seed Zeng
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for AsyncFunctions, due to how it monitors the output collector to gather back pressure statistics.

But that wouldn’t explain how you get a faster processing with the AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then that’s one (serious hand waving) explanation.

— Ken

On Mar 18, 2019, at 7:48 PM, Seed Zeng <[hidden email]> wrote:

Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Seed Zeng
Hi Ken and Andrey,

Thanks for the response. I think there is a confusion that the writes to Cassandra are happening within the Async function. 
In my test, the async function is just a pass-through without doing any work.

So any Cassandra related batching or buffering should not be the cause for this.

Thanks,

Seed

On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for AsyncFunctions, due to how it monitors the output collector to gather back pressure statistics.

But that wouldn’t explain how you get a faster processing with the AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then that’s one (serious hand waving) explanation.

— Ken

On Mar 18, 2019, at 7:48 PM, Seed Zeng <[hidden email]> wrote:

Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Ken Krugler
Hi Seed,

I was assuming the Cassandra sink was separate from and after your async function.

I was trying to come up for an explanation as to why adding the async function would improve your performance.

The only very unlikely reason I thought of was that the async function somehow caused data arriving at the sink to be more “batchy”, which (if the Cassandra sink had an “every x seconds do a write” batch mode) could improve performance.

— Ken

On Mar 19, 2019, at 11:35 AM, Seed Zeng <[hidden email]> wrote:

Hi Ken and Andrey,

Thanks for the response. I think there is a confusion that the writes to Cassandra are happening within the Async function. 
In my test, the async function is just a pass-through without doing any work.

So any Cassandra related batching or buffering should not be the cause for this.

Thanks,

Seed

On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for AsyncFunctions, due to how it monitors the output collector to gather back pressure statistics.

But that wouldn’t explain how you get a faster processing with the AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then that’s one (serious hand waving) explanation.

— Ken

On Mar 18, 2019, at 7:48 PM, Seed Zeng <[hidden email]> wrote:

Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Andrey Zagrebin-3
Hi Seed,

Sorry for confusion, I see now it is separate. Back pressure should still be created because internal async queue has capacity 
but not sure about reporting problem, Ken and Till probably have better idea.

As for consumption speed up, async operator creates another thread to collect the result and Cassandra sink probably uses that thread to write data.
This might parallelize and pipeline previous steps like Kafka fetching and Cassandra IO but I am also not sure about this explanation.

Best,
Andrey


On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

I was assuming the Cassandra sink was separate from and after your async function.

I was trying to come up for an explanation as to why adding the async function would improve your performance.

The only very unlikely reason I thought of was that the async function somehow caused data arriving at the sink to be more “batchy”, which (if the Cassandra sink had an “every x seconds do a write” batch mode) could improve performance.

— Ken

On Mar 19, 2019, at 11:35 AM, Seed Zeng <[hidden email]> wrote:

Hi Ken and Andrey,

Thanks for the response. I think there is a confusion that the writes to Cassandra are happening within the Async function. 
In my test, the async function is just a pass-through without doing any work.

So any Cassandra related batching or buffering should not be the cause for this.

Thanks,

Seed

On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for AsyncFunctions, due to how it monitors the output collector to gather back pressure statistics.

But that wouldn’t explain how you get a faster processing with the AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then that’s one (serious hand waving) explanation.

— Ken

On Mar 18, 2019, at 7:48 PM, Seed Zeng <[hidden email]> wrote:

Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Seed Zeng
Hey Andrey and Ken,
Sorry about the late reply. I might not have been clear in my question
The performance of writing to Cassandra is the same in both cases, only that the source rate was higher in the case of the async function is present. 
Something is "buffering" and not propagating backpressure to slow down the source speed from Kafka.

In our use case, we prefer the backpressure to slow down the source so that the write to Cassandra is not delayed while the source is consuming fast.

Thanks,
Seed

On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Seed,

Sorry for confusion, I see now it is separate. Back pressure should still be created because internal async queue has capacity 
but not sure about reporting problem, Ken and Till probably have better idea.

As for consumption speed up, async operator creates another thread to collect the result and Cassandra sink probably uses that thread to write data.
This might parallelize and pipeline previous steps like Kafka fetching and Cassandra IO but I am also not sure about this explanation.

Best,
Andrey


On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

I was assuming the Cassandra sink was separate from and after your async function.

I was trying to come up for an explanation as to why adding the async function would improve your performance.

The only very unlikely reason I thought of was that the async function somehow caused data arriving at the sink to be more “batchy”, which (if the Cassandra sink had an “every x seconds do a write” batch mode) could improve performance.

— Ken

On Mar 19, 2019, at 11:35 AM, Seed Zeng <[hidden email]> wrote:

Hi Ken and Andrey,

Thanks for the response. I think there is a confusion that the writes to Cassandra are happening within the Async function. 
In my test, the async function is just a pass-through without doing any work.

So any Cassandra related batching or buffering should not be the cause for this.

Thanks,

Seed

On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for AsyncFunctions, due to how it monitors the output collector to gather back pressure statistics.

But that wouldn’t explain how you get a faster processing with the AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then that’s one (serious hand waving) explanation.

— Ken

On Mar 18, 2019, at 7:48 PM, Seed Zeng <[hidden email]> wrote:

Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Andrey Zagrebin-3
Hi Seed,

when you create `AsyncDataStream.(un)orderedWait` which capacity do you pass in or you use the default one (100)?

Best,
Andrey

On Thu, Mar 21, 2019 at 2:49 AM Seed Zeng <[hidden email]> wrote:
Hey Andrey and Ken,
Sorry about the late reply. I might not have been clear in my question
The performance of writing to Cassandra is the same in both cases, only that the source rate was higher in the case of the async function is present. 
Something is "buffering" and not propagating backpressure to slow down the source speed from Kafka.

In our use case, we prefer the backpressure to slow down the source so that the write to Cassandra is not delayed while the source is consuming fast.

Thanks,
Seed

On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Seed,

Sorry for confusion, I see now it is separate. Back pressure should still be created because internal async queue has capacity 
but not sure about reporting problem, Ken and Till probably have better idea.

As for consumption speed up, async operator creates another thread to collect the result and Cassandra sink probably uses that thread to write data.
This might parallelize and pipeline previous steps like Kafka fetching and Cassandra IO but I am also not sure about this explanation.

Best,
Andrey


On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

I was assuming the Cassandra sink was separate from and after your async function.

I was trying to come up for an explanation as to why adding the async function would improve your performance.

The only very unlikely reason I thought of was that the async function somehow caused data arriving at the sink to be more “batchy”, which (if the Cassandra sink had an “every x seconds do a write” batch mode) could improve performance.

— Ken

On Mar 19, 2019, at 11:35 AM, Seed Zeng <[hidden email]> wrote:

Hi Ken and Andrey,

Thanks for the response. I think there is a confusion that the writes to Cassandra are happening within the Async function. 
In my test, the async function is just a pass-through without doing any work.

So any Cassandra related batching or buffering should not be the cause for this.

Thanks,

Seed

On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for AsyncFunctions, due to how it monitors the output collector to gather back pressure statistics.

But that wouldn’t explain how you get a faster processing with the AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then that’s one (serious hand waving) explanation.

— Ken

On Mar 18, 2019, at 7:48 PM, Seed Zeng <[hidden email]> wrote:

Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Ken Krugler
In reply to this post by Seed Zeng

On Mar 20, 2019, at 6:49 PM, Seed Zeng <[hidden email]> wrote:

Hey Andrey and Ken,
Sorry about the late reply. I might not have been clear in my question
The performance of writing to Cassandra is the same in both cases, only that the source rate was higher in the case of the async function is present. 

OK, I was confused by what you’d originally written...

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

If the source rate is _temporarily_ higher, then that maybe makes sense, as the async function will be able to buffer up to the configured capacity.


AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

The capacity is 100 (which is also the default, if you don’t specify it)

Something is "buffering" and not propagating backpressure to slow down the source speed from Kafka.

In our use case, we prefer the backpressure to slow down the source so that the write to Cassandra is not delayed while the source is consuming fast.

You can use a smaller capacity to reduce the impact, but that could obviously impact the performance whatever your using the async function to parallelize.

Regards,

— Ken

On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Seed,

Sorry for confusion, I see now it is separate. Back pressure should still be created because internal async queue has capacity 
but not sure about reporting problem, Ken and Till probably have better idea.

As for consumption speed up, async operator creates another thread to collect the result and Cassandra sink probably uses that thread to write data.
This might parallelize and pipeline previous steps like Kafka fetching and Cassandra IO but I am also not sure about this explanation.

Best,
Andrey


On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

I was assuming the Cassandra sink was separate from and after your async function.

I was trying to come up for an explanation as to why adding the async function would improve your performance.

The only very unlikely reason I thought of was that the async function somehow caused data arriving at the sink to be more “batchy”, which (if the Cassandra sink had an “every x seconds do a write” batch mode) could improve performance.

— Ken

On Mar 19, 2019, at 11:35 AM, Seed Zeng <[hidden email]> wrote:

Hi Ken and Andrey,

Thanks for the response. I think there is a confusion that the writes to Cassandra are happening within the Async function. 
In my test, the async function is just a pass-through without doing any work.

So any Cassandra related batching or buffering should not be the cause for this.

Thanks,

Seed

On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for AsyncFunctions, due to how it monitors the output collector to gather back pressure statistics.

But that wouldn’t explain how you get a faster processing with the AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then that’s one (serious hand waving) explanation.

— Ken

On Mar 18, 2019, at 7:48 PM, Seed Zeng <[hidden email]> wrote:

Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Async Function Not Generating Backpressure

Till Rohrmann
I think Seed is correct that we don't properly report backpressure from an AsyncWaitOperator. The problem is that not the Task's main execution thread but the Emitter thread will emit the elements and, thus, be stuck in the `requestBufferBuilderBlocking` method. This, however, does not mean that the AsyncWaitOperator does not generate backpressure. In fact, if your `StreamElementQueue` is full, then it should generate backpressure. The upstream task should eventually report backpressure which should be visible in the web UI.

If you want, then you could open a JIRA issue for this problem and try to solve the problem.

Cheers,
Till

On Thu, Mar 21, 2019 at 4:41 PM Ken Krugler <[hidden email]> wrote:

On Mar 20, 2019, at 6:49 PM, Seed Zeng <[hidden email]> wrote:

Hey Andrey and Ken,
Sorry about the late reply. I might not have been clear in my question
The performance of writing to Cassandra is the same in both cases, only that the source rate was higher in the case of the async function is present. 

OK, I was confused by what you’d originally written...

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

If the source rate is _temporarily_ higher, then that maybe makes sense, as the async function will be able to buffer up to the configured capacity.


AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

The capacity is 100 (which is also the default, if you don’t specify it)

Something is "buffering" and not propagating backpressure to slow down the source speed from Kafka.

In our use case, we prefer the backpressure to slow down the source so that the write to Cassandra is not delayed while the source is consuming fast.

You can use a smaller capacity to reduce the impact, but that could obviously impact the performance whatever your using the async function to parallelize.

Regards,

— Ken

On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Seed,

Sorry for confusion, I see now it is separate. Back pressure should still be created because internal async queue has capacity 
but not sure about reporting problem, Ken and Till probably have better idea.

As for consumption speed up, async operator creates another thread to collect the result and Cassandra sink probably uses that thread to write data.
This might parallelize and pipeline previous steps like Kafka fetching and Cassandra IO but I am also not sure about this explanation.

Best,
Andrey


On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

I was assuming the Cassandra sink was separate from and after your async function.

I was trying to come up for an explanation as to why adding the async function would improve your performance.

The only very unlikely reason I thought of was that the async function somehow caused data arriving at the sink to be more “batchy”, which (if the Cassandra sink had an “every x seconds do a write” batch mode) could improve performance.

— Ken

On Mar 19, 2019, at 11:35 AM, Seed Zeng <[hidden email]> wrote:

Hi Ken and Andrey,

Thanks for the response. I think there is a confusion that the writes to Cassandra are happening within the Async function. 
In my test, the async function is just a pass-through without doing any work.

So any Cassandra related batching or buffering should not be the cause for this.

Thanks,

Seed

On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <[hidden email]> wrote:
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for AsyncFunctions, due to how it monitors the output collector to gather back pressure statistics.

But that wouldn’t explain how you get a faster processing with the AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then that’s one (serious hand waving) explanation.

— Ken

On Mar 18, 2019, at 7:48 PM, Seed Zeng <[hidden email]> wrote:

Flink Version - 1.6.1

In our application, we consume from Kafka and sink to Cassandra in the end. We are trying to introduce a custom async function in front of the Sink to carry out some customized operations. In our testing, it appears that the Async function is not generating backpressure to slow down our Kafka Source when Cassandra becomes unhappy. Essentially compared to an almost identical job where the only difference is the lack of the Async function, Kafka source consumption speed is much higher under the same settings and identical Cassandra cluster. The experiment is like this.

Job 1 - without async function in front of Cassandra
Job 2 - with async function in front of Cassandra

Job 1 is backpressured because Cassandra cannot handle all the writes and eventually slows down the source rate to 6.5k/s. 
Job 2 is slightly backpressured but was able to run at 14k/s.

Is the AsyncFunction somehow not reporting the backpressure correctly?

Thanks,
Seed

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra