Blocking KeyedCoProcessFunction.processElement1

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Blocking KeyedCoProcessFunction.processElement1

Alexey Trenikhun

Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Blocking KeyedCoProcessFunction.processElement1

Yun Tang
Hi Alexey

Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.

Best
Yun Tang

From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04
To: [hidden email] <[hidden email]>
Subject: Blocking KeyedCoProcessFunction.processElement1
 

Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Blocking KeyedCoProcessFunction.processElement1

Alexey Trenikhun
Thank you Yun Tang.
My implementation potentially could block for significant amount of time, because I wanted to do RDBMS maintenance (create partitions for new data, purge old data etc) in-line with writing stream data to a database


From: Yun Tang <[hidden email]>
Sent: Sunday, January 26, 2020 8:42:37 AM
To: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Blocking KeyedCoProcessFunction.processElement1
 
Hi Alexey

Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.

Best
Yun Tang

From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04
To: [hidden email] <[hidden email]>
Subject: Blocking KeyedCoProcessFunction.processElement1
 

Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Blocking KeyedCoProcessFunction.processElement1

Yun Tang
Hi Alexey

If possible, I think you could move some RDBMS maintenance operations to the #open method of RichFunction to reduce the possibility of blocking processing records.

Best
Yun Tang

From: Alexey Trenikhun <[hidden email]>
Sent: Tuesday, January 28, 2020 15:15
To: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Blocking KeyedCoProcessFunction.processElement1
 
Thank you Yun Tang.
My implementation potentially could block for significant amount of time, because I wanted to do RDBMS maintenance (create partitions for new data, purge old data etc) in-line with writing stream data to a database


From: Yun Tang <[hidden email]>
Sent: Sunday, January 26, 2020 8:42:37 AM
To: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Blocking KeyedCoProcessFunction.processElement1
 
Hi Alexey

Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.

Best
Yun Tang

From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04
To: [hidden email] <[hidden email]>
Subject: Blocking KeyedCoProcessFunction.processElement1
 

Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Blocking KeyedCoProcessFunction.processElement1

Arvid Heise-3
Hi Alexey,

we cannot perform a checkpoint on a UDF that is still being called as we would not be able to have a consistent snapshot. You could potentially have changed the state, so if we replay the event during recovery, you may get inexact results. For example consider a simple counter, where you just count all data coming in from input1. If we would checkpoint during the UDF invocation, you could have already incremented the counter, such that upon recovery, you would count that input record twice.

This conceptual inability to checkpoint during UDF is completely independent of the implementation constraints that Yun outlined; much of that has changed in 1.10 and will further improve in 1.11.

So coming back to your question, it's generally a bad idea to do heavy computation within one UDF call. If that maintenance work needs to be done prior to any record processing, `open` sounds more plausible. I'd even consider doing that work in your `main` if it doesn't need to be redone during recovery.

If you need access to the records (to create new partitions), I'd go with a prepended asyncIO. It's specifically built around interactions with external systems and supports intermediate checkpoints (at the cost that users may not use user state).

-- Arvid

On Tue, Jan 28, 2020 at 8:50 AM Yun Tang <[hidden email]> wrote:
Hi Alexey

If possible, I think you could move some RDBMS maintenance operations to the #open method of RichFunction to reduce the possibility of blocking processing records.

Best
Yun Tang

From: Alexey Trenikhun <[hidden email]>
Sent: Tuesday, January 28, 2020 15:15
To: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Blocking KeyedCoProcessFunction.processElement1
 
Thank you Yun Tang.
My implementation potentially could block for significant amount of time, because I wanted to do RDBMS maintenance (create partitions for new data, purge old data etc) in-line with writing stream data to a database


From: Yun Tang <[hidden email]>
Sent: Sunday, January 26, 2020 8:42:37 AM
To: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Blocking KeyedCoProcessFunction.processElement1
 
Hi Alexey

Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.

Best
Yun Tang

From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04
To: [hidden email] <[hidden email]>
Subject: Blocking KeyedCoProcessFunction.processElement1
 

Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Blocking KeyedCoProcessFunction.processElement1

taher koitawala-2
In reply to this post by Alexey Trenikhun
Would AsyncIO operator not be an option for you to connect to RDBMS? 

On Tue, Jan 28, 2020, 12:45 PM Alexey Trenikhun <[hidden email]> wrote:
Thank you Yun Tang.
My implementation potentially could block for significant amount of time, because I wanted to do RDBMS maintenance (create partitions for new data, purge old data etc) in-line with writing stream data to a database


From: Yun Tang <[hidden email]>
Sent: Sunday, January 26, 2020 8:42:37 AM
To: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Blocking KeyedCoProcessFunction.processElement1
 
Hi Alexey

Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.

Best
Yun Tang

From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04
To: [hidden email] <[hidden email]>
Subject: Blocking KeyedCoProcessFunction.processElement1
 

Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Blocking KeyedCoProcessFunction.processElement1

Till Rohrmann
Yes, using AsyncIO could help in the case of blocking operations.

Cheers,
Till

On Tue, Jan 28, 2020 at 10:45 AM Taher Koitawala <[hidden email]> wrote:
Would AsyncIO operator not be an option for you to connect to RDBMS? 

On Tue, Jan 28, 2020, 12:45 PM Alexey Trenikhun <[hidden email]> wrote:
Thank you Yun Tang.
My implementation potentially could block for significant amount of time, because I wanted to do RDBMS maintenance (create partitions for new data, purge old data etc) in-line with writing stream data to a database


From: Yun Tang <[hidden email]>
Sent: Sunday, January 26, 2020 8:42:37 AM
To: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Blocking KeyedCoProcessFunction.processElement1
 
Hi Alexey

Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.

Best
Yun Tang

From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04
To: [hidden email] <[hidden email]>
Subject: Blocking KeyedCoProcessFunction.processElement1
 

Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?

Thanks,
Alexey