I was just thinking about about letting a coprocessfunction "block" or cause back pressure on one of it's streams? Has this been discussed as an option? Does anyone know a way to effectively accomplish this? I think I could get a lot of mileage out of something like that without needing a full implementation of FLIP-17 (which I would eagerly await still). As mentioned on another thread, one could use a liststate to buffer the main input until the "side input" was sufficiently processed. However the downside of this is that I have no way to control the size of those buffers, whereas with backpressure, the system will naturally take care of it. |
Maybe it could work with Flink’s 1.5 credit base flow control. But you would need a way to express state “block one input side of the CoProcessFunction”, pass this information up to the input gate and handle it probably similar to how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks inputs in case of checkpoint barrier. You can not just block inside `processElement1` method.
However I haven’t thought it through and maybe there could be some issues regarding checkpointing (what should happen to checkpoint barriers if you are blocking one side of the input? Should this block checkpoint barrier as well? Should you cancel checkpoint?). Piotrek > On 2 May 2018, at 16:31, Derek VerLee <[hidden email]> wrote: > > > I was just thinking about about letting a coprocessfunction "block" or cause back pressure on one of it's streams? > Has this been discussed as an option? > Does anyone know a way to effectively accomplish this? > > I think I could get a lot of mileage out of something like that without needing a full implementation of FLIP-17 (which I would eagerly await still). > > As mentioned on another thread, one could use a liststate to buffer the main input until the "side input" was sufficiently processed. However the downside of this is that I have no way to control the size of those buffers, whereas with backpressure, the system will naturally take care of it. |
Thanks for the thoughts Piotr. Seems I have a talent for asking (nearly) the same question as someone else at the same time, and the check-pointing was raised in that thread as well. I guess one way to conceptualize it is that you have is a stream job that has "phases" and transitions between those phases. Maybe there would be a new type of barrier to indicate a change between phases? But now I'm way outside the bounds of hoping to have a "quick and dirty" version of a proper side input implementation. I'm chewing on two new ideas now: Using a "union" stream instead of two streams, and custom source backed by two different sources under the hood, so the "state machine" logic transitioning from initialization to normal operation all happen in the same operator. Or, running a batch or "bounded stream" job first to generate a "cache state", and then launching the main streaming job, which loads this initial state load in open()... not sure how to work out the keying. I'll post back if I get anywhere with these ideas.
On 5/3/18 10:49 AM, Piotr Nowojski
wrote:
Maybe it could work with Flink’s 1.5 credit base flow control. But you would need a way to express state “block one input side of the CoProcessFunction”, pass this information up to the input gate and handle it probably similar to how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks inputs in case of checkpoint barrier. You can not just block inside `processElement1` method. However I haven’t thought it through and maybe there could be some issues regarding checkpointing (what should happen to checkpoint barriers if you are blocking one side of the input? Should this block checkpoint barrier as well? Should you cancel checkpoint?). PiotrekOn 2 May 2018, at 16:31, Derek VerLee [hidden email] wrote: I was just thinking about about letting a coprocessfunction "block" or cause back pressure on one of it's streams? Has this been discussed as an option? Does anyone know a way to effectively accomplish this? I think I could get a lot of mileage out of something like that without needing a full implementation of FLIP-17 (which I would eagerly await still). As mentioned on another thread, one could use a liststate to buffer the main input until the "side input" was sufficiently processed. However the downside of this is that I have no way to control the size of those buffers, whereas with backpressure, the system will naturally take care of it. |
This is the recommended workaround this issue - first start a job to precompute some values for an initial state and then pass those values to the main job as (for example) a startup argument. I think for now it’s the cleanest and the easiest to maintain solution. If initial state is too large, you could imagine saving it on a DFS and loading it in initialise phase of the main job. Piotrek
|
Free forum by Nabble | Edit this page |