Hello User Community!
I am running some streaming SQL that involves a union all into an over window similar to the below: SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY rowtime RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime FROM (SELECT rowtime, user_id, thing as action FROM user_things UNION ALL SELECT rowtime, user_id, stuff as action FROM user_stuff) The SQL generates three operators. There are two operators that process the 'from' part of the clause that feed into an 'over' operator. I notice that messages flow into the 'over' operator and just buffer there for a long time (hours in some cases). Eventually something happens and the data starts to flush through to the downstream operators. Can anyone help me understand what is causing that behavior? I want the data to flow through more consistently. Thanks! |
Hi Greg. Based on a quick test I cannot reproduce the issue, it is emitting messages correctly in the ITCase environment. can you share more information? Does the same problem happen if you use proctime? I am guessing this could be highly correlated with how you set your watermark strategy of your input streams of "user_things" and "user_stuff". -- Rong On Tue, Jun 26, 2018 at 6:37 PM Gregory Fee <[hidden email]> wrote:
|
Hi, The OVER window operator can only emit result when the watermark is advanced, due to SQL semantics which define that all records with the same timestamp need to be processed together. Can you check if the watermarks make sufficient progress? Btw. did you observe state size or IO issues? The OVER window operator also needs to store the whole window interval in state, i.e., 14 days in your case, in order to be able to retract the data from the aggregates after 14 days. Everytime the watermark moves, the operator iterates over all timestamps (per key) to check which records need to be removed. Best, Fabian 2018-06-27 5:38 GMT+02:00 Rong Rong <[hidden email]>:
|
Thanks for your answers! Yes, it was based on watermarks. Fabian, the state does indeed grow quite a bit in my scenario. I've observed in the range of 5GB. That doesn't seem to be an issue in itself. However, in my scenario I'm loading a lot of data from a historic store that is only partitioned by day. As such a full day's worth of data is loaded into the system before the watermark advances. At that point the checkpoints stall indefinitely with a couple of the tasks in the 'over' operator never acknowledging. Any thoughts on what would cause that? Or how to address it? On Wed, Jun 27, 2018 at 2:20 AM, Fabian Hueske <[hidden email]> wrote:
<form method="post" target="_blank" onsubmit="try {return window.confirm("You are submitting information to an external page.\nAre you sure?");} catch (e) {return false;}"> Gregory Fee |
Hi Gregory, As you are using the rowtime over window. It is probably a watermark problem. The window only output when watermarks make a progress. You can use processing-time(instead of row-time) to verify the assumption. Also, make sure there are data in each of you source partition, the watermarks make no progress if one of the source partition has no data. An operator’s current event time is the minimum of its input streams’ event times[1]. Best, Hequn On Thu, Jun 28, 2018 at 1:58 AM, Gregory Fee <[hidden email]> wrote:
|
Yep, it was definitely a watermarking issue. I have that sorted out now. Thanks! On Wed, Jun 27, 2018 at 6:49 PM, Hequn Cheng <[hidden email]> wrote:
<form method="post" target="_blank" onsubmit="try {return window.confirm("You are submitting information to an external page.\nAre you sure?");} catch (e) {return false;}"> Gregory Fee |
Hi Gregory, What's the cause of your problem. It would be great if you can share your experience which I think will definitely help others. On Thu, Jun 28, 2018 at 11:30 AM, Gregory Fee <[hidden email]> wrote:
|
In a nutshell the Over operator works as follows: - When a row arrives it is put into a MapState keyed on its timestamp and a timer is registered to process it when the watermark passes that timestamp. - All the heavy computation is done in the onTimer() method. For each unique timestamp, the Over operator iterates once over all records in the MapState to retract and purge expired rows from and accumulate the new rows to the aggregation result. In Gregory's use case, many rows are added before the watermark advances. Once that happens, the operator becomes very busy and iterates many times over the state to retract and accumulate rows. During that time, the input stream cannot be consumed, hence checkpoints stall. The solution that seems to work is to increase the watermark interval. However, we could also think about improving the implementation to reduce the number of state iterations. A time-sorted state primitive would make that much easier. Best, Fabian 2018-06-28 6:41 GMT+02:00 Hequn Cheng <[hidden email]>:
|
In reply to this post by Hequn Cheng
I'm writing a custom S3 source in order to work around some issues with back pressure and checkpointing at scale in my bootstrap logic. I moved around the logic to assign timestamps and watermarks. As part of that I ended up generating watermarks earlier in the pipeline but having another operator that ended up stripping off all watermarks. On Wed, Jun 27, 2018 at 9:41 PM, Hequn Cheng <[hidden email]> wrote:
<form method="post" target="_blank" onsubmit="try {return window.confirm("You are submitting information to an external page.\nAre you sure?");} catch (e) {return false;}"> Gregory Fee |
In reply to this post by Fabian Hueske-2
Thanks! I'm working on a way to deliver the data in order (or closer to in order) and deliver watermarks more often. I'll let you know my results. On Thu, Jun 28, 2018 at 5:36 AM, Fabian Hueske <[hidden email]> wrote:
<form method="post" target="_blank" onsubmit="try {return window.confirm("You are submitting information to an external page.\nAre you sure?");} catch (e) {return false;}"> Gregory Fee |
Free forum by Nabble | Edit this page |