We have a SQL based flink job which is consume a very low volume stream (1 or 2 events in few hours):
SELECT user_id, COUNT(*) OVER (PARTITION BY user_id ORDER BY rowtime RANGE INTERVAL '30' DAY PRECEDING) as count_30_days, COALESCE(occurred_at, logged_at) AS latency_marker, rowtime FROM event_foo WHERE user_id IS NOT NULL The OVER operator seems to filter out events as per the flink dashboard (records received = <non-zero-number> records sent = 0) The operator looks like this: over: (PARTITION BY: $1, ORDER BY: rowtime, RANGEBETWEEN 2592000000 PRECEDING AND CURRENT ROW, select: (rowtime, $1, $2, COUNT(*) AS w0$o0)) -> select: ($1 AS user_id, w0$o0 AS count_30_days, $2 AS latency_marker, rowtime) -> to: Tuple2 -> Filter -> group_counter_count_30d.1.sqlRecords -> sample_without_formatter I know that the OVER operator can discard late arriving events, but these events are not arriving late for sure. The watermark for all operators stay at 0 because the output events is 0. We have an exactly same SQL job against a high volume stream that is working fine. Watermarks progress in timely manner and events are delivered in timely manner as well. Any idea what could be going wrong? Are the events getting buffered waiting for certain number of events? If so, what is the threshold? Thanks, Vinod |
(Forgot to mention that we are using Flink 1.4) Update: Earlier the OVER operator was assigned a parallelism of 64. I reduced it to 1 and the problem went away! Now the OVER operator is not filtering/buffering the events anymore. Can someone explain this please? Thanks, Vinod On Fri, Aug 23, 2019 at 3:09 PM Vinod Mehra <[hidden email]> wrote:
|
Although things improved during bootstrapping and when even volume was larger. As soon as the traffic slowed down the events are getting stuck (buffered?) at the OVER operator for a very long time. Several hours. On Fri, Aug 23, 2019 at 5:04 PM Vinod Mehra <[hidden email]> wrote:
|
When there are new events the old events just get stuck for many hours (more than a day). So if there is a buffering going on it seems it is not time based but size based (?). Looks like unless the buffered events exceed a certain threshold they don't get flushed out (?). Is that what is going on? Can someone confirm? Is there a way to flush out periodically? Thanks, Vinod On Fri, Aug 23, 2019 at 10:37 PM Vinod Mehra <[hidden email]> wrote:
|
Hi Vinod, This sounds like a watermark issue to me. The commonly used watermark strategies (like bounded out-of-order) are only advancing when there is a new record. Moreover, the current watermark is the minimum of the current watermarks of all input partitions. So, the watermark only moves forward if the watermark of the "most-behind" partition advances. If you have many parallel partitions and only very few records every hour, it might take a long time until "the right" partition processes a new record and hence advances its watermark. My recommendation would be to ensure that you have only one source that reads the records and assigns watermarks (maybe even keep the parallelism of the whole query to 1 if possible). Moreover, you might want to think about a more aggressive watermarking strategy that advances even if there is no data received based on processing time. Best, Fabian Am So., 25. Aug. 2019 um 20:51 Uhr schrieb Vinod Mehra <[hidden email]>:
|
Thanks Fabian for your detailed reply! > My recommendation would be to ensure that you have only one source that reads the records and assigns watermarks (maybe even keep the parallelism of the whole query to 1 if possible). Actually I have already experimented by reducing the parallelism to 1 as you can see in the following snapshot: records received = 122 records sent = 115 parallelism = 1 This implies there are 7 stuck records (unless it's a stats issue???). We are using event time based watermarking and keep it behind by 5 seconds from the latest event-time. I do see that watermark advances properly whenever a new event arrives. For this specific job the events arrive once in several hours, I was hoping to see only one undelivered event max, but the above stats give me a larger number. I am experimenting by reducing the 5 second delay to 0 to see if that changes anything. > Moreover, you might want to think about a more aggressive watermarking strategy that advances even if there is no data received based on processing time. Yes I am planning to experiment with this as well. Thank, Vinod On Mon, Aug 26, 2019 at 2:31 AM Fabian Hueske <[hidden email]> wrote:
|
Hi Vinod, The metrics that Flink collects are not consistent across the job, for example the reporting intervals of operators are not synchronized. The records might be currently in some send or receive buffer or "on the wire" or still buffered in state because the watermark was not advanced yet. I'd recommend to compare the actual input and output data. The metrics are good for rough checks (is the application processing data, are watermarks as expected, etc.). Best, Fabian Am Do., 29. Aug. 2019 um 00:38 Uhr schrieb Vinod Mehra <[hidden email]>:
|
Free forum by Nabble | Edit this page |