Hi, I have a Flink sql streaming job defined by: SELECT there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic. I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec. There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7) Can anybody give a clue of what could be wrong? Thanks, Fanbin |
Hi Fanbin, The delay is most likely caused by the watermark delay. A window is computed when the watermark passes the end of the window. If you configured the watermark to be 10 minutes before the current max timestamp (probably to account for out of order data), then the window will be computed with approx. 10 minute delay. Best, Fabian Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <[hidden email]>:
|
Thanks Fabian for the prompt reply. I just started using Flink and this is a great community. The watermark setting is only accounting for 10 sec delay. Besides that, the local job on IntelliJ is running fine without issues. Here is the code: class EventTimestampExtractor(slack: Long = 0L) extends AssignerWithPeriodicWatermarks[T] {
Are there any other things I should be aware of? Thanks again for you kind help! Fanbin On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <[hidden email]> wrote:
|
not sure whether this is related: public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( parallelism is set to 32 val env = StreamExecutionEnvironment.getExecutionEnvironment
and the command for launching the job is flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <[hidden email]> wrote:
|
If I use proctime, the groupBy happens without any delay. On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <[hidden email]> wrote:
|
Hi Fanbin, Fabian is right, it should be a watermark problem. Probably, some tasks of the source don't have enough data to advance the watermark. Furthermore, you could also monitor event time through Flink web interface. I have answered a similar question on stackoverflow, see more details here[1]. Best, Hequn On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <[hidden email]> wrote:
|
Hequn, Thanks for the help. It is indeed a watermark problem. From Flink UI, I can see the low watermark value for each operator. And the groupBy operator has lagged value of watermark. I checked the link from SO and confirmed that: 1. I do see record coming in for this operator 2. I have parallelism = 32 and only one task has the record. Can you please elaborate more on why this would affect the watermark advancement? 3. Event create time is in ms 4. data span time > window time. I don't quite understand why this matters. Thanks, Fanbin On Tue, Jul 23, 2019 at 7:17 PM Hequn Cheng <[hidden email]> wrote:
|
Hi Fanbin, > 2. I have parallelism = 32 and only one task has the record. Can you please elaborate more on why this would affect the watermark advancement? Each parallel subtask of a source function usually generates its watermarks independently, say wk1, wk2... wkn. The downstream window operator’s current event time is the minimum of its input streams’ event times, so here wk_window = min(wk1, wk2... wkn). If some of the tasks don't have data, the wk_window would not be advanced. More details here[1]. In your case, you can set the parallelism of the source to 1 to solve your problem, and also, keep the parallelism of assignTimestampsAndWatermarks same with source. > 4. data span time > window time. I don't quite understand why this matters. For example, if you have a tumbling window with a size of 1 day, but the data all comes are within 1 hour of this day. In this case, the event time would not reach the end of the window, i.e., the window will not fire. Best, Hequn On Thu, Jul 25, 2019 at 12:17 AM Fanbin Bu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |