Async Datastream Checkpointing

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Async Datastream Checkpointing

Alexandru Vasiu-2
Hi,

We have a pipeline which has somewhere a step of AsyncDataStream.unorderedWait where some web requests are executed. The pipeline works, but when it tries to make checkpoints it fails always with a timeout error (and it stops at the component containing this async data stream). We are using Flink 1.10.0 in Scala 2.12.10 and this config for checkpoints:

"checkpoints_interval": 180000, 
"min_pause_between_checkpoints": 10000, 
"checkpoints_timeout": 600000, 
"tolerable_checkpoints_failure_number": 20, 
"max_concurrent_checkpoints": 1,
"checkpoint_mode": CheckpointingMode.EXACTLY_ONCE

Do you know why checkpointing doesn't work in this case?

Thank you,
Alex Vasiu

ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, including any attachments, is intended only for the use of the individual(s) to whom it is addressed and may contain information that is strictly privileged/confidential. Any other distribution, copying or disclosure is strictly prohibited. If you are not the intended recipient or have received this message in error, please notify the sender immediately by reply email and permanently delete this message including any attachments, without reading it or making a copy. Contact us. Website.
Reply | Threaded
Open this post in threaded view
|

Re: Async Datastream Checkpointing

Arvid Heise-3
Hi Alexandru,

the most likely reason is that you are using AsyncDataStream incorrectly. You have to ensure that all work is done in a separate thread.

AsyncIO will only guarantee that async results are merged back into the sync stream. The reason is that many libraries have their own thread pool to send async requests and we didn't want to duplicate that.
In the easiest way, you spawn an executor with Executors.newFixedThreadPool(10) and submit the jobs and then feed the results back.

On Wed, Feb 26, 2020 at 2:27 PM Alexandru Vasiu <[hidden email]> wrote:
Hi,

We have a pipeline which has somewhere a step of AsyncDataStream.unorderedWait where some web requests are executed. The pipeline works, but when it tries to make checkpoints it fails always with a timeout error (and it stops at the component containing this async data stream). We are using Flink 1.10.0 in Scala 2.12.10 and this config for checkpoints:

"checkpoints_interval": 180000, 
"min_pause_between_checkpoints": 10000, 
"checkpoints_timeout": 600000, 
"tolerable_checkpoints_failure_number": 20, 
"max_concurrent_checkpoints": 1,
"checkpoint_mode": CheckpointingMode.EXACTLY_ONCE

Do you know why checkpointing doesn't work in this case?

Thank you,
Alex Vasiu

ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, including any attachments, is intended only for the use of the individual(s) to whom it is addressed and may contain information that is strictly privileged/confidential. Any other distribution, copying or disclosure is strictly prohibited. If you are not the intended recipient or have received this message in error, please notify the sender immediately by reply email and permanently delete this message including any attachments, without reading it or making a copy. Contact us. Website.
Reply | Threaded
Open this post in threaded view
|

Re: Async Datastream Checkpointing

Alexandru Vasiu-2
Hi,

We tried to use ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) as executor and we still have this problem. We even tried even other executors, but still the same. Only once we succeded to have a checkpoint when we tried without the web requests. 

Do you have any other ideas?

Thank you,
Alex

On Thu, Feb 27, 2020 at 4:24 PM Alexandru Vasiu <[hidden email]> wrote:
Hi,

We tried to use ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) as executor and we still have this problem. We even tried even other executors, but still the same. Only once we succeded to have a checkpoint when we tried without the web requests. 

Do you have any other ideas?

Thank you,
Alex

On Wed, Feb 26, 2020 at 5:21 PM Arvid Heise <[hidden email]> wrote:
Hi Alexandru,

the most likely reason is that you are using AsyncDataStream incorrectly. You have to ensure that all work is done in a separate thread.

AsyncIO will only guarantee that async results are merged back into the sync stream. The reason is that many libraries have their own thread pool to send async requests and we didn't want to duplicate that.
In the easiest way, you spawn an executor with Executors.newFixedThreadPool(10) and submit the jobs and then feed the results back.

On Wed, Feb 26, 2020 at 2:27 PM Alexandru Vasiu <[hidden email]> wrote:
Hi,

We have a pipeline which has somewhere a step of AsyncDataStream.unorderedWait where some web requests are executed. The pipeline works, but when it tries to make checkpoints it fails always with a timeout error (and it stops at the component containing this async data stream). We are using Flink 1.10.0 in Scala 2.12.10 and this config for checkpoints:

"checkpoints_interval": 180000, 
"min_pause_between_checkpoints": 10000, 
"checkpoints_timeout": 600000, 
"tolerable_checkpoints_failure_number": 20, 
"max_concurrent_checkpoints": 1,
"checkpoint_mode": CheckpointingMode.EXACTLY_ONCE

Do you know why checkpointing doesn't work in this case?

Thank you,
Alex Vasiu

ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, including any attachments, is intended only for the use of the individual(s) to whom it is addressed and may contain information that is strictly privileged/confidential. Any other distribution, copying or disclosure is strictly prohibited. If you are not the intended recipient or have received this message in error, please notify the sender immediately by reply email and permanently delete this message including any attachments, without reading it or making a copy. Contact us. Website.


--

Read our 2020 Global Compliance Report Now

Join our roundtable, sign up now




--

Read our 2020 Global Compliance Report Now

Join our roundtable, sign up now



ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, including any attachments, is intended only for the use of the individual(s) to whom it is addressed and may contain information that is strictly privileged/confidential. Any other distribution, copying or disclosure is strictly prohibited. If you are not the intended recipient or have received this message in error, please notify the sender immediately by reply email and permanently delete this message including any attachments, without reading it or making a copy. Contact us. Website.
Reply | Threaded
Open this post in threaded view
|

Re: Async Datastream Checkpointing

Arvid Heise-3
Hi Alexandru,

I cannot follow what you posted, so let me point you to one example [1]. The executor needs to be used inside the async function.




On Thu, Feb 27, 2020 at 5:30 PM Alexandru Vasiu <[hidden email]> wrote:
Hi,

We tried to use ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) as executor and we still have this problem. We even tried even other executors, but still the same. Only once we succeded to have a checkpoint when we tried without the web requests. 

Do you have any other ideas?

Thank you,
Alex

On Thu, Feb 27, 2020 at 4:24 PM Alexandru Vasiu <[hidden email]> wrote:
Hi,

We tried to use ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) as executor and we still have this problem. We even tried even other executors, but still the same. Only once we succeded to have a checkpoint when we tried without the web requests. 

Do you have any other ideas?

Thank you,
Alex

On Wed, Feb 26, 2020 at 5:21 PM Arvid Heise <[hidden email]> wrote:
Hi Alexandru,

the most likely reason is that you are using AsyncDataStream incorrectly. You have to ensure that all work is done in a separate thread.

AsyncIO will only guarantee that async results are merged back into the sync stream. The reason is that many libraries have their own thread pool to send async requests and we didn't want to duplicate that.
In the easiest way, you spawn an executor with Executors.newFixedThreadPool(10) and submit the jobs and then feed the results back.

On Wed, Feb 26, 2020 at 2:27 PM Alexandru Vasiu <[hidden email]> wrote:
Hi,

We have a pipeline which has somewhere a step of AsyncDataStream.unorderedWait where some web requests are executed. The pipeline works, but when it tries to make checkpoints it fails always with a timeout error (and it stops at the component containing this async data stream). We are using Flink 1.10.0 in Scala 2.12.10 and this config for checkpoints:

"checkpoints_interval": 180000, 
"min_pause_between_checkpoints": 10000, 
"checkpoints_timeout": 600000, 
"tolerable_checkpoints_failure_number": 20, 
"max_concurrent_checkpoints": 1,
"checkpoint_mode": CheckpointingMode.EXACTLY_ONCE

Do you know why checkpointing doesn't work in this case?

Thank you,
Alex Vasiu

ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, including any attachments, is intended only for the use of the individual(s) to whom it is addressed and may contain information that is strictly privileged/confidential. Any other distribution, copying or disclosure is strictly prohibited. If you are not the intended recipient or have received this message in error, please notify the sender immediately by reply email and permanently delete this message including any attachments, without reading it or making a copy. Contact us. Website.


--

Read our 2020 Global Compliance Report Now

Join our roundtable, sign up now




--

Read our 2020 Global Compliance Report Now

Join our roundtable, sign up now



ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, including any attachments, is intended only for the use of the individual(s) to whom it is addressed and may contain information that is strictly privileged/confidential. Any other distribution, copying or disclosure is strictly prohibited. If you are not the intended recipient or have received this message in error, please notify the sender immediately by reply email and permanently delete this message including any attachments, without reading it or making a copy. Contact us. Website.
Reply | Threaded
Open this post in threaded view
|

Re: Async Datastream Checkpointing

Alexandru Vasiu-2
Hi,

That's how we used the executor. I think the problem is that the web requests took too long time to complete (3-4 seconds) because the requests are using a proxy server. I also transformed the asyncDataStream using a flatMap and same issue (no successfull checkpoint). If I used a simple web page without the proxy server and the checkpointing works.

Alex

ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, including any attachments, is intended only for the use of the individual(s) to whom it is addressed and may contain information that is strictly privileged/confidential. Any other distribution, copying or disclosure is strictly prohibited. If you are not the intended recipient or have received this message in error, please notify the sender immediately by reply email and permanently delete this message including any attachments, without reading it or making a copy. Contact us. Website.
Reply | Threaded
Open this post in threaded view
|

Re: Async Datastream Checkpointing

Arvid Heise-3
Hi Alexandru,

please share your code of the AsyncFunction. Your observed behaviour is completely not in line how things should behave.

As long as you are not blocking AsyncFunction#asyncInvoke, checkpointing will work.

On Fri, Feb 28, 2020 at 9:16 AM Alexandru Vasiu <[hidden email]> wrote:
Hi,

That's how we used the executor. I think the problem is that the web requests took too long time to complete (3-4 seconds) because the requests are using a proxy server. I also transformed the asyncDataStream using a flatMap and same issue (no successfull checkpoint). If I used a simple web page without the proxy server and the checkpointing works.

Alex

ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, including any attachments, is intended only for the use of the individual(s) to whom it is addressed and may contain information that is strictly privileged/confidential. Any other distribution, copying or disclosure is strictly prohibited. If you are not the intended recipient or have received this message in error, please notify the sender immediately by reply email and permanently delete this message including any attachments, without reading it or making a copy. Contact us. Website.