GroupBy result delay

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

GroupBy result delay

Fanbin Bu
Hi, 
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)

there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic.

image.png

I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec.

There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7)

Can anybody give a clue of what could be wrong?
Thanks,

Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: GroupBy result delay

Fabian Hueske-2
Hi Fanbin,

The delay is most likely caused by the watermark delay.
A window is computed when the watermark passes the end of the window. If you configured the watermark to be 10 minutes before the current max timestamp (probably to account for out of order data), then the window will be computed with approx. 10 minute delay.

Best, Fabian

Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <[hidden email]>:
Hi, 
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)

there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic.

image.png

I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec.

There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7)

Can anybody give a clue of what could be wrong?
Thanks,

Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: GroupBy result delay

Fanbin Bu
Thanks Fabian for the prompt reply. I just started using Flink and this is a great community. 
The watermark setting is only accounting for 10 sec delay. Besides that, the local job on IntelliJ is running fine without issues.

Here is the code:

class EventTimestampExtractor(slack: Long = 0L) extends AssignerWithPeriodicWatermarks[T] {

var currentMaxTimestamp: Long = _

override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
val elemTs = e.created_at
currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
elemTs
}

override def getCurrentWatermark(): Watermark = {
new Watermark(currentMaxTimestamp)
}
}
events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
Are there any other things I should be aware of?

Thanks again for you kind help!

Fanbin


On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <[hidden email]> wrote:
Hi Fanbin,

The delay is most likely caused by the watermark delay.
A window is computed when the watermark passes the end of the window. If you configured the watermark to be 10 minutes before the current max timestamp (probably to account for out of order data), then the window will be computed with approx. 10 minute delay.

Best, Fabian

Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <[hidden email]>:
Hi, 
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)

there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic.

image.png

I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec.

There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7)

Can anybody give a clue of what could be wrong?
Thanks,

Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: GroupBy result delay

Fanbin Bu
not sure whether this is related:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
parallelism is set to 32
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(32)
and the command for launching the job is 

flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS




On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <[hidden email]> wrote:
Thanks Fabian for the prompt reply. I just started using Flink and this is a great community. 
The watermark setting is only accounting for 10 sec delay. Besides that, the local job on IntelliJ is running fine without issues.

Here is the code:

class EventTimestampExtractor(slack: Long = 0L) extends AssignerWithPeriodicWatermarks[T] {

var currentMaxTimestamp: Long = _

override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
val elemTs = e.created_at
currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
elemTs
}

override def getCurrentWatermark(): Watermark = {
new Watermark(currentMaxTimestamp)
}
}
events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
Are there any other things I should be aware of?

Thanks again for you kind help!

Fanbin


On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <[hidden email]> wrote:
Hi Fanbin,

The delay is most likely caused by the watermark delay.
A window is computed when the watermark passes the end of the window. If you configured the watermark to be 10 minutes before the current max timestamp (probably to account for out of order data), then the window will be computed with approx. 10 minute delay.

Best, Fabian

Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <[hidden email]>:
Hi, 
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)

there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic.

image.png

I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec.

There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7)

Can anybody give a clue of what could be wrong?
Thanks,

Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: GroupBy result delay

Fanbin Bu
If I use proctime, the groupBy happens without any delay.

On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <[hidden email]> wrote:
not sure whether this is related:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
parallelism is set to 32
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(32)
and the command for launching the job is 

flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS




On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <[hidden email]> wrote:
Thanks Fabian for the prompt reply. I just started using Flink and this is a great community. 
The watermark setting is only accounting for 10 sec delay. Besides that, the local job on IntelliJ is running fine without issues.

Here is the code:

class EventTimestampExtractor(slack: Long = 0L) extends AssignerWithPeriodicWatermarks[T] {

var currentMaxTimestamp: Long = _

override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
val elemTs = e.created_at
currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
elemTs
}

override def getCurrentWatermark(): Watermark = {
new Watermark(currentMaxTimestamp)
}
}
events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
Are there any other things I should be aware of?

Thanks again for you kind help!

Fanbin


On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <[hidden email]> wrote:
Hi Fanbin,

The delay is most likely caused by the watermark delay.
A window is computed when the watermark passes the end of the window. If you configured the watermark to be 10 minutes before the current max timestamp (probably to account for out of order data), then the window will be computed with approx. 10 minute delay.

Best, Fabian

Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <[hidden email]>:
Hi, 
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)

there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic.

image.png

I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec.

There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7)

Can anybody give a clue of what could be wrong?
Thanks,

Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: GroupBy result delay

Hequn Cheng
Hi Fanbin,

Fabian is right, it should be a watermark problem. Probably, some tasks of the source don't have enough data to advance the watermark. Furthermore, you could also monitor event time through Flink web interface.
I have answered a similar question on stackoverflow, see more details here[1].

Best, Hequn


On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <[hidden email]> wrote:
If I use proctime, the groupBy happens without any delay.

On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <[hidden email]> wrote:
not sure whether this is related:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
parallelism is set to 32
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(32)
and the command for launching the job is 

flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS




On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <[hidden email]> wrote:
Thanks Fabian for the prompt reply. I just started using Flink and this is a great community. 
The watermark setting is only accounting for 10 sec delay. Besides that, the local job on IntelliJ is running fine without issues.

Here is the code:

class EventTimestampExtractor(slack: Long = 0L) extends AssignerWithPeriodicWatermarks[T] {

var currentMaxTimestamp: Long = _

override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
val elemTs = e.created_at
currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
elemTs
}

override def getCurrentWatermark(): Watermark = {
new Watermark(currentMaxTimestamp)
}
}
events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
Are there any other things I should be aware of?

Thanks again for you kind help!

Fanbin


On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <[hidden email]> wrote:
Hi Fanbin,

The delay is most likely caused by the watermark delay.
A window is computed when the watermark passes the end of the window. If you configured the watermark to be 10 minutes before the current max timestamp (probably to account for out of order data), then the window will be computed with approx. 10 minute delay.

Best, Fabian

Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <[hidden email]>:
Hi, 
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)

there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic.

image.png

I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec.

There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7)

Can anybody give a clue of what could be wrong?
Thanks,

Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: GroupBy result delay

Fanbin Bu
Hequn,

Thanks for the help. It is indeed a watermark problem. From Flink UI, I can see the low watermark value for each operator. And the groupBy operator has lagged value of watermark. I checked the link from SO and confirmed that:
1. I do see record coming in for this operator
2. I have parallelism = 32 and only one task has the record. Can you please elaborate more on why this would affect the watermark advancement?
3. Event create time is in ms
4. data span time > window time. I don't quite understand why this matters. 

Thanks,
Fanbin

On Tue, Jul 23, 2019 at 7:17 PM Hequn Cheng <[hidden email]> wrote:
Hi Fanbin,

Fabian is right, it should be a watermark problem. Probably, some tasks of the source don't have enough data to advance the watermark. Furthermore, you could also monitor event time through Flink web interface.
I have answered a similar question on stackoverflow, see more details here[1].

Best, Hequn


On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <[hidden email]> wrote:
If I use proctime, the groupBy happens without any delay.

On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <[hidden email]> wrote:
not sure whether this is related:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
parallelism is set to 32
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(32)
and the command for launching the job is 

flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS




On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <[hidden email]> wrote:
Thanks Fabian for the prompt reply. I just started using Flink and this is a great community. 
The watermark setting is only accounting for 10 sec delay. Besides that, the local job on IntelliJ is running fine without issues.

Here is the code:

class EventTimestampExtractor(slack: Long = 0L) extends AssignerWithPeriodicWatermarks[T] {

var currentMaxTimestamp: Long = _

override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
val elemTs = e.created_at
currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
elemTs
}

override def getCurrentWatermark(): Watermark = {
new Watermark(currentMaxTimestamp)
}
}
events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
Are there any other things I should be aware of?

Thanks again for you kind help!

Fanbin


On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <[hidden email]> wrote:
Hi Fanbin,

The delay is most likely caused by the watermark delay.
A window is computed when the watermark passes the end of the window. If you configured the watermark to be 10 minutes before the current max timestamp (probably to account for out of order data), then the window will be computed with approx. 10 minute delay.

Best, Fabian

Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <[hidden email]>:
Hi, 
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)

there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic.

image.png

I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec.

There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7)

Can anybody give a clue of what could be wrong?
Thanks,

Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: GroupBy result delay

Hequn Cheng
Hi Fanbin,

> 2. I have parallelism = 32 and only one task has the record. Can you please elaborate more on why this would affect the watermark advancement?
Each parallel subtask of a source function usually generates its watermarks independently, say wk1, wk2... wkn. The downstream window operator’s current event time is the minimum of its input streams’ event times, so here wk_window = min(wk1, wk2... wkn).
If some of the tasks don't have data, the wk_window would not be advanced. More details here[1].

In your case, you can set the parallelism of the source to 1 to solve your problem, and also, keep the parallelism of assignTimestampsAndWatermarks same with source.

> 4. data span time > window time. I don't quite understand why this matters.
For example, if you have a tumbling window with a size of 1 day, but the data all comes are within 1 hour of this day. In this case, the event time would not reach the end of the window, i.e., the window will not fire. 

Best, Hequn


On Thu, Jul 25, 2019 at 12:17 AM Fanbin Bu <[hidden email]> wrote:
Hequn,

Thanks for the help. It is indeed a watermark problem. From Flink UI, I can see the low watermark value for each operator. And the groupBy operator has lagged value of watermark. I checked the link from SO and confirmed that:
1. I do see record coming in for this operator
2. I have parallelism = 32 and only one task has the record. Can you please elaborate more on why this would affect the watermark advancement?
3. Event create time is in ms
4. data span time > window time. I don't quite understand why this matters. 

Thanks,
Fanbin

On Tue, Jul 23, 2019 at 7:17 PM Hequn Cheng <[hidden email]> wrote:
Hi Fanbin,

Fabian is right, it should be a watermark problem. Probably, some tasks of the source don't have enough data to advance the watermark. Furthermore, you could also monitor event time through Flink web interface.
I have answered a similar question on stackoverflow, see more details here[1].

Best, Hequn


On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <[hidden email]> wrote:
If I use proctime, the groupBy happens without any delay.

On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <[hidden email]> wrote:
not sure whether this is related:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
parallelism is set to 32
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(32)
and the command for launching the job is 

flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS




On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <[hidden email]> wrote:
Thanks Fabian for the prompt reply. I just started using Flink and this is a great community. 
The watermark setting is only accounting for 10 sec delay. Besides that, the local job on IntelliJ is running fine without issues.

Here is the code:

class EventTimestampExtractor(slack: Long = 0L) extends AssignerWithPeriodicWatermarks[T] {

var currentMaxTimestamp: Long = _

override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
val elemTs = e.created_at
currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
elemTs
}

override def getCurrentWatermark(): Watermark = {
new Watermark(currentMaxTimestamp)
}
}
events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
Are there any other things I should be aware of?

Thanks again for you kind help!

Fanbin


On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <[hidden email]> wrote:
Hi Fanbin,

The delay is most likely caused by the watermark delay.
A window is computed when the watermark passes the end of the window. If you configured the watermark to be 10 minutes before the current max timestamp (probably to account for out of order data), then the window will be computed with approx. 10 minute delay.

Best, Fabian

Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <[hidden email]>:
Hi, 
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)

there is a noticeably delay of the groupBy operator. For example, I only see the record sent out 10 min later after the record received in. see the attached pic.

image.png

I m expecting to see the group by result after one minute since the sliding window size is 1 min and the slide is 30 sec.

There is no such issue if I run the job locally in IntelliJ. However, I ran into the above issue if I run the job on EMR (flink version = 1.7)

Can anybody give a clue of what could be wrong?
Thanks,

Fanbin