Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?
Thanks,
Alexey
|
Hi Alexey
Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long
time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.
Best
Yun Tang
From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04 To: [hidden email] <[hidden email]> Subject: Blocking KeyedCoProcessFunction.processElement1 Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?
Thanks,
Alexey
|
Thank you Yun Tang.
My implementation potentially could block for significant amount of time, because I wanted to do RDBMS maintenance (create partitions for new data, purge old data etc) in-line with writing stream data to a database
From: Yun Tang <[hidden email]>
Sent: Sunday, January 26, 2020 8:42:37 AM To: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]> Subject: Re: Blocking KeyedCoProcessFunction.processElement1
Hi Alexey
Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long
time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.
Best
Yun Tang
From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04 To: [hidden email] <[hidden email]> Subject: Blocking KeyedCoProcessFunction.processElement1 Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?
Thanks,
Alexey
|
Hi Alexey
If possible, I think you could move some RDBMS maintenance operations to the #open method of RichFunction to reduce the possibility of blocking processing records.
Best
Yun Tang
From: Alexey Trenikhun <[hidden email]>
Sent: Tuesday, January 28, 2020 15:15 To: Yun Tang <[hidden email]>; [hidden email] <[hidden email]> Subject: Re: Blocking KeyedCoProcessFunction.processElement1 Thank you Yun Tang.
My implementation potentially could block for significant amount of time, because I wanted to do RDBMS maintenance (create partitions for new data, purge old data etc) in-line with writing stream data to a database
From: Yun Tang <[hidden email]>
Sent: Sunday, January 26, 2020 8:42:37 AM To: Alexey Trenikhun <[hidden email]>; [hidden email] <[hidden email]> Subject: Re: Blocking KeyedCoProcessFunction.processElement1
Hi Alexey
Actually, I don't understand why you thing KeyedCoProcessFunction#processElement1 would block for significant amount of time, it just process record from the elements in the first input stream which is necessary. If you really find it would block for a long
time, I think that's because your processing logic has some problem to stuck. On the other hand, since processing checkpoint and records hold the same lock, we cannot process checkpoint when the record processing logic did not release the lock.
Best
Yun Tang
From: Alexey Trenikhun <[hidden email]>
Sent: Thursday, January 23, 2020 13:04 To: [hidden email] <[hidden email]> Subject: Blocking KeyedCoProcessFunction.processElement1 Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ?
Thanks,
Alexey
|
Hi Alexey, we cannot perform a checkpoint on a UDF that is still being called as we would not be able to have a consistent snapshot. You could potentially have changed the state, so if we replay the event during recovery, you may get inexact results. For example consider a simple counter, where you just count all data coming in from input1. If we would checkpoint during the UDF invocation, you could have already incremented the counter, such that upon recovery, you would count that input record twice. This conceptual inability to checkpoint during UDF is completely independent of the implementation constraints that Yun outlined; much of that has changed in 1.10 and will further improve in 1.11. So coming back to your question, it's generally a bad idea to do heavy computation within one UDF call. If that maintenance work needs to be done prior to any record processing, `open` sounds more plausible. I'd even consider doing that work in your `main` if it doesn't need to be redone during recovery. If you need access to the records (to create new partitions), I'd go with a prepended asyncIO. It's specifically built around interactions with external systems and supports intermediate checkpoints (at the cost that users may not use user state). -- Arvid On Tue, Jan 28, 2020 at 8:50 AM Yun Tang <[hidden email]> wrote:
|
In reply to this post by Alexey Trenikhun
Would AsyncIO operator not be an option for you to connect to RDBMS? On Tue, Jan 28, 2020, 12:45 PM Alexey Trenikhun <[hidden email]> wrote:
|
Yes, using AsyncIO could help in the case of blocking operations. Cheers, Till On Tue, Jan 28, 2020 at 10:45 AM Taher Koitawala <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |