Query on retract stream

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Query on retract stream

Gagan Agrawal
Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Hequn Cheng
Hi Gagan,

Yes, you can achieve this with Flink TableAPI/SQL. However, you have to pay attention to the following things:
1) Currently, Flink only ingests append streams. In order to ingest upsert streams(steam with keys), you can use groupBy with a user-defined LAST_VALUE aggregate function. For implementation, you can refer to the MAX AggregateFunction(MAX always return the max value while LAST_VALUE always return the latest value). The SQL may look like:

SELECT user, COUNT(*)
FROM (
SELECT order, LAST_VALUE(user), LAST_VALUE(status), LAST_VALUE(event_time)
FROM SourceTable
GROUP BY order
)
WHERE status = 'pending'
GROUP BY user
You have to note that the query will be processed under processing time instead of event time. But I think it would be fine for you, as the final result will be right.

As for the upsert source, there is already a pr[1] on it, and it is under review now.

2) You have to note that once you output results to Kafka according to a configured threshold. The output record cannot be deleted anymore even the count value decreased. Because Kafka doesn't support delete messages. Also, this issue[2] make things worse. You can take a detailed look if you interested in it.

On Sat, Jan 19, 2019 at 1:31 AM Gagan Agrawal <[hidden email]> wrote:
Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Piotr Nowojski-2
In reply to this post by Gagan Agrawal
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan


Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Jeff Zhang
I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Piotr Nowojski-2
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Gagan Agrawal
Thank you guys. It's great to hear multiple solutions to achieve this. I understand that records once emitted to Kafka can not be deleted and that's acceptable for our use case as last updated value should always be correct. However as I understand most of these solutions will work for global aggregation which was asked in original question. But I also have requirement for event time based sliding window aggregation where same order count needs to be maintained for past x hours window (sliding at say every 5 minutes). Is it possible to achieve with Table Api / SQL at the moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) -> Success (t2) happens in same window w1. In this case once window aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this case old(and already emitted)  window (w1) needs to be re-emitted with decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in window w1 and Success (t2) in w2. I think in this case it may not require to re-emit old window w1 as it represents pending count till that window time (w1) which is still valid as record moved to Success in next window w2 (based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <[hidden email]> wrote:
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Hequn Cheng
Hi Gagan,

> But I also have a requirement for event time based sliding window aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently, sliding windows don't support early fire, i.e., only output results when event time reaches the end of the window. Once window fires, the window state will be cleared and late data belonging to this window will be ignored. In order to wait for the late event, you can extract watermark with an offset from the timestamp. For example, make watermark = timestamp - 5min. 

If event time and early fire is a strong requirement in your scenarios, you can probably use an over window[1] to solve your problem, say an over window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a DataStream job in which define your own window logic[2].

Best, Hequn




On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal <[hidden email]> wrote:
Thank you guys. It's great to hear multiple solutions to achieve this. I understand that records once emitted to Kafka can not be deleted and that's acceptable for our use case as last updated value should always be correct. However as I understand most of these solutions will work for global aggregation which was asked in original question. But I also have requirement for event time based sliding window aggregation where same order count needs to be maintained for past x hours window (sliding at say every 5 minutes). Is it possible to achieve with Table Api / SQL at the moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) -> Success (t2) happens in same window w1. In this case once window aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this case old(and already emitted)  window (w1) needs to be re-emitted with decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in window w1 and Success (t2) in w2. I think in this case it may not require to re-emit old window w1 as it represents pending count till that window time (w1) which is still valid as record moved to Success in next window w2 (based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <[hidden email]> wrote:
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Gagan Agrawal
Thanks Hequn for your response. I initially thought of trying out "over window" clause, however as per documentation there seems to be limitation in "orderBy" clause where it allows only single time event/processing time attribute. Whereas in my case events are getting generated from mysql bin log where I have seen multiple event updates getting generated with same timestamp (may be because they are part of same transaction) and hence will need bin log offset along with timestamp to be able to sort them correctly. So looks like can't use "over window" until it allows multiple columns in "orderBy". I am exploring option of creating my own window as you suggested to be more flexible.

Gagan

On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

> But I also have a requirement for event time based sliding window aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently, sliding windows don't support early fire, i.e., only output results when event time reaches the end of the window. Once window fires, the window state will be cleared and late data belonging to this window will be ignored. In order to wait for the late event, you can extract watermark with an offset from the timestamp. For example, make watermark = timestamp - 5min. 

If event time and early fire is a strong requirement in your scenarios, you can probably use an over window[1] to solve your problem, say an over window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a DataStream job in which define your own window logic[2].

Best, Hequn




On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal <[hidden email]> wrote:
Thank you guys. It's great to hear multiple solutions to achieve this. I understand that records once emitted to Kafka can not be deleted and that's acceptable for our use case as last updated value should always be correct. However as I understand most of these solutions will work for global aggregation which was asked in original question. But I also have requirement for event time based sliding window aggregation where same order count needs to be maintained for past x hours window (sliding at say every 5 minutes). Is it possible to achieve with Table Api / SQL at the moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) -> Success (t2) happens in same window w1. In this case once window aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this case old(and already emitted)  window (w1) needs to be re-emitted with decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in window w1 and Success (t2) in w2. I think in this case it may not require to re-emit old window w1 as it represents pending count till that window time (w1) which is still valid as record moved to Success in next window w2 (based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <[hidden email]> wrote:
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Gagan Agrawal
Based on the suggestions in this mail thread, I tried out few experiments on upsert stream with flink 1.7.1 and here is the issue I am facing with window stream.

1. Global Pending order count. 
Following query works fine and it's able to handle updates as per original requirement.

select userId, count(orderId) from
(select orderId, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId)
where status='PENDING' group by userId

2. Last 1 Hour tumbling window count (Append stream)
Though following query doesn't handle upsert stream, I just tried to make sure time column is working fine. This is working, but as expected, it doesn't handle updates on orderId.

select userId, count(orderId) from orders
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

3. Last 1 Hour tumbling window count (With upsert stream)
Now I tried combination of above two where input stream is converted to upsert stream (via lastValue aggregate function) and then Pending count needs to be calculated in last 1 hour window.

select userId, count(orderId) from
(select orderId, orderTime, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId, orderTime)
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

This one gives me following error. Is this because I have added orderTime in group by/select clause and hence it's time characteristics have changed? What is the workaround here as without adding orderTime, I can not perform window aggregation on upsert stream.

[error] Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
[error]         at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
[error]         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
[error]         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
[error]         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
[error]         at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)

Gagan

On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal <[hidden email]> wrote:
Thanks Hequn for your response. I initially thought of trying out "over window" clause, however as per documentation there seems to be limitation in "orderBy" clause where it allows only single time event/processing time attribute. Whereas in my case events are getting generated from mysql bin log where I have seen multiple event updates getting generated with same timestamp (may be because they are part of same transaction) and hence will need bin log offset along with timestamp to be able to sort them correctly. So looks like can't use "over window" until it allows multiple columns in "orderBy". I am exploring option of creating my own window as you suggested to be more flexible.

Gagan

On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

> But I also have a requirement for event time based sliding window aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently, sliding windows don't support early fire, i.e., only output results when event time reaches the end of the window. Once window fires, the window state will be cleared and late data belonging to this window will be ignored. In order to wait for the late event, you can extract watermark with an offset from the timestamp. For example, make watermark = timestamp - 5min. 

If event time and early fire is a strong requirement in your scenarios, you can probably use an over window[1] to solve your problem, say an over window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a DataStream job in which define your own window logic[2].

Best, Hequn




On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal <[hidden email]> wrote:
Thank you guys. It's great to hear multiple solutions to achieve this. I understand that records once emitted to Kafka can not be deleted and that's acceptable for our use case as last updated value should always be correct. However as I understand most of these solutions will work for global aggregation which was asked in original question. But I also have requirement for event time based sliding window aggregation where same order count needs to be maintained for past x hours window (sliding at say every 5 minutes). Is it possible to achieve with Table Api / SQL at the moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) -> Success (t2) happens in same window w1. In this case once window aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this case old(and already emitted)  window (w1) needs to be re-emitted with decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in window w1 and Success (t2) in w2. I think in this case it may not require to re-emit old window w1 as it represents pending count till that window time (w1) which is still valid as record moved to Success in next window w2 (based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <[hidden email]> wrote:
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Hequn Cheng
Hi Gagan,

Time attribute fields will be materialized by the unbounded groupby. Also, currently, the window doesn't have the ability to handle retraction messages. I see two ways to solve the problem.

- Use multi-window.  The first window performs lastValue, the second performs count.
- Use two non-window aggregates. In this case, you don't have to change anything for the first aggregate. For the second one, you can group by an hour field and perform count(). The code looks like:

SELECT userId,
         count(orderId)
FROM 
    (SELECT orderId,
         getHour(orderTime) as myHour,
         lastValue(userId) AS userId,
         lastValue(status) AS status
    FROM orders
    GROUP BY  orderId, orderTime)
WHERE status='PENDING'
GROUP BY myHour, userId 

Best,
Hequn

 


On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal <[hidden email]> wrote:
Based on the suggestions in this mail thread, I tried out few experiments on upsert stream with flink 1.7.1 and here is the issue I am facing with window stream.

1. Global Pending order count. 
Following query works fine and it's able to handle updates as per original requirement.

select userId, count(orderId) from
(select orderId, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId)
where status='PENDING' group by userId

2. Last 1 Hour tumbling window count (Append stream)
Though following query doesn't handle upsert stream, I just tried to make sure time column is working fine. This is working, but as expected, it doesn't handle updates on orderId.

select userId, count(orderId) from orders
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

3. Last 1 Hour tumbling window count (With upsert stream)
Now I tried combination of above two where input stream is converted to upsert stream (via lastValue aggregate function) and then Pending count needs to be calculated in last 1 hour window.

select userId, count(orderId) from
(select orderId, orderTime, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId, orderTime)
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

This one gives me following error. Is this because I have added orderTime in group by/select clause and hence it's time characteristics have changed? What is the workaround here as without adding orderTime, I can not perform window aggregation on upsert stream.

[error] Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
[error]         at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
[error]         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
[error]         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
[error]         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
[error]         at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)

Gagan

On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal <[hidden email]> wrote:
Thanks Hequn for your response. I initially thought of trying out "over window" clause, however as per documentation there seems to be limitation in "orderBy" clause where it allows only single time event/processing time attribute. Whereas in my case events are getting generated from mysql bin log where I have seen multiple event updates getting generated with same timestamp (may be because they are part of same transaction) and hence will need bin log offset along with timestamp to be able to sort them correctly. So looks like can't use "over window" until it allows multiple columns in "orderBy". I am exploring option of creating my own window as you suggested to be more flexible.

Gagan

On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

> But I also have a requirement for event time based sliding window aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently, sliding windows don't support early fire, i.e., only output results when event time reaches the end of the window. Once window fires, the window state will be cleared and late data belonging to this window will be ignored. In order to wait for the late event, you can extract watermark with an offset from the timestamp. For example, make watermark = timestamp - 5min. 

If event time and early fire is a strong requirement in your scenarios, you can probably use an over window[1] to solve your problem, say an over window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a DataStream job in which define your own window logic[2].

Best, Hequn




On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal <[hidden email]> wrote:
Thank you guys. It's great to hear multiple solutions to achieve this. I understand that records once emitted to Kafka can not be deleted and that's acceptable for our use case as last updated value should always be correct. However as I understand most of these solutions will work for global aggregation which was asked in original question. But I also have requirement for event time based sliding window aggregation where same order count needs to be maintained for past x hours window (sliding at say every 5 minutes). Is it possible to achieve with Table Api / SQL at the moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) -> Success (t2) happens in same window w1. In this case once window aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this case old(and already emitted)  window (w1) needs to be re-emitted with decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in window w1 and Success (t2) in w2. I think in this case it may not require to re-emit old window w1 as it represents pending count till that window time (w1) which is still valid as record moved to Success in next window w2 (based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <[hidden email]> wrote:
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Gagan Agrawal
Thanks Hequn for suggested solutions and I think this should really work and will give it a try. As I understand First solution  of using multiple windows will be good for those scenarios where I want output to be generated post window is materialized (i.e. watermark reaches end of window). And second will be good if I want it to be fired on per event basis (i.e no watermarking). Apart from this, do you see any difference from performance perspective in choosing between the two or both should be equally performant?

Gagan

On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

Time attribute fields will be materialized by the unbounded groupby. Also, currently, the window doesn't have the ability to handle retraction messages. I see two ways to solve the problem.

- Use multi-window.  The first window performs lastValue, the second performs count.
- Use two non-window aggregates. In this case, you don't have to change anything for the first aggregate. For the second one, you can group by an hour field and perform count(). The code looks like:

SELECT userId,
         count(orderId)
FROM 
    (SELECT orderId,
         getHour(orderTime) as myHour,
         lastValue(userId) AS userId,
         lastValue(status) AS status
    FROM orders
    GROUP BY  orderId, orderTime)
WHERE status='PENDING'
GROUP BY myHour, userId 

Best,
Hequn

 


On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal <[hidden email]> wrote:
Based on the suggestions in this mail thread, I tried out few experiments on upsert stream with flink 1.7.1 and here is the issue I am facing with window stream.

1. Global Pending order count. 
Following query works fine and it's able to handle updates as per original requirement.

select userId, count(orderId) from
(select orderId, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId)
where status='PENDING' group by userId

2. Last 1 Hour tumbling window count (Append stream)
Though following query doesn't handle upsert stream, I just tried to make sure time column is working fine. This is working, but as expected, it doesn't handle updates on orderId.

select userId, count(orderId) from orders
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

3. Last 1 Hour tumbling window count (With upsert stream)
Now I tried combination of above two where input stream is converted to upsert stream (via lastValue aggregate function) and then Pending count needs to be calculated in last 1 hour window.

select userId, count(orderId) from
(select orderId, orderTime, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId, orderTime)
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

This one gives me following error. Is this because I have added orderTime in group by/select clause and hence it's time characteristics have changed? What is the workaround here as without adding orderTime, I can not perform window aggregation on upsert stream.

[error] Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
[error]         at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
[error]         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
[error]         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
[error]         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
[error]         at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)

Gagan

On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal <[hidden email]> wrote:
Thanks Hequn for your response. I initially thought of trying out "over window" clause, however as per documentation there seems to be limitation in "orderBy" clause where it allows only single time event/processing time attribute. Whereas in my case events are getting generated from mysql bin log where I have seen multiple event updates getting generated with same timestamp (may be because they are part of same transaction) and hence will need bin log offset along with timestamp to be able to sort them correctly. So looks like can't use "over window" until it allows multiple columns in "orderBy". I am exploring option of creating my own window as you suggested to be more flexible.

Gagan

On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

> But I also have a requirement for event time based sliding window aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently, sliding windows don't support early fire, i.e., only output results when event time reaches the end of the window. Once window fires, the window state will be cleared and late data belonging to this window will be ignored. In order to wait for the late event, you can extract watermark with an offset from the timestamp. For example, make watermark = timestamp - 5min. 

If event time and early fire is a strong requirement in your scenarios, you can probably use an over window[1] to solve your problem, say an over window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a DataStream job in which define your own window logic[2].

Best, Hequn




On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal <[hidden email]> wrote:
Thank you guys. It's great to hear multiple solutions to achieve this. I understand that records once emitted to Kafka can not be deleted and that's acceptable for our use case as last updated value should always be correct. However as I understand most of these solutions will work for global aggregation which was asked in original question. But I also have requirement for event time based sliding window aggregation where same order count needs to be maintained for past x hours window (sliding at say every 5 minutes). Is it possible to achieve with Table Api / SQL at the moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) -> Success (t2) happens in same window w1. In this case once window aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this case old(and already emitted)  window (w1) needs to be re-emitted with decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in window w1 and Success (t2) in w2. I think in this case it may not require to re-emit old window w1 as it represents pending count till that window time (w1) which is still valid as record moved to Success in next window w2 (based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <[hidden email]> wrote:
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Hequn Cheng
Hi Gagan,

Besides the eventime and proctime difference, there is another difference between the two ways. The window aggregate on bounded data, while unbounded aggregate on unbounded data, i.e., the new coming data can update a very old data. 

As for the performance, I think the two ways may have no big difference in current Flink version. Maybe you can run some tests between them on your own scenarios if both of them can solve your problem. FYI: There is a nice discussion[1] raised by Timo recently. Once Blink is merged into Flink, the unbounded aggregate will be much faster than the window.

Best,
Hequn



On Sat, Jan 26, 2019 at 4:11 PM Gagan Agrawal <[hidden email]> wrote:
Thanks Hequn for suggested solutions and I think this should really work and will give it a try. As I understand First solution  of using multiple windows will be good for those scenarios where I want output to be generated post window is materialized (i.e. watermark reaches end of window). And second will be good if I want it to be fired on per event basis (i.e no watermarking). Apart from this, do you see any difference from performance perspective in choosing between the two or both should be equally performant?

Gagan

On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

Time attribute fields will be materialized by the unbounded groupby. Also, currently, the window doesn't have the ability to handle retraction messages. I see two ways to solve the problem.

- Use multi-window.  The first window performs lastValue, the second performs count.
- Use two non-window aggregates. In this case, you don't have to change anything for the first aggregate. For the second one, you can group by an hour field and perform count(). The code looks like:

SELECT userId,
         count(orderId)
FROM 
    (SELECT orderId,
         getHour(orderTime) as myHour,
         lastValue(userId) AS userId,
         lastValue(status) AS status
    FROM orders
    GROUP BY  orderId, orderTime)
WHERE status='PENDING'
GROUP BY myHour, userId 

Best,
Hequn

 


On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal <[hidden email]> wrote:
Based on the suggestions in this mail thread, I tried out few experiments on upsert stream with flink 1.7.1 and here is the issue I am facing with window stream.

1. Global Pending order count. 
Following query works fine and it's able to handle updates as per original requirement.

select userId, count(orderId) from
(select orderId, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId)
where status='PENDING' group by userId

2. Last 1 Hour tumbling window count (Append stream)
Though following query doesn't handle upsert stream, I just tried to make sure time column is working fine. This is working, but as expected, it doesn't handle updates on orderId.

select userId, count(orderId) from orders
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

3. Last 1 Hour tumbling window count (With upsert stream)
Now I tried combination of above two where input stream is converted to upsert stream (via lastValue aggregate function) and then Pending count needs to be calculated in last 1 hour window.

select userId, count(orderId) from
(select orderId, orderTime, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId, orderTime)
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

This one gives me following error. Is this because I have added orderTime in group by/select clause and hence it's time characteristics have changed? What is the workaround here as without adding orderTime, I can not perform window aggregation on upsert stream.

[error] Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
[error]         at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
[error]         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
[error]         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
[error]         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
[error]         at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)

Gagan

On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal <[hidden email]> wrote:
Thanks Hequn for your response. I initially thought of trying out "over window" clause, however as per documentation there seems to be limitation in "orderBy" clause where it allows only single time event/processing time attribute. Whereas in my case events are getting generated from mysql bin log where I have seen multiple event updates getting generated with same timestamp (may be because they are part of same transaction) and hence will need bin log offset along with timestamp to be able to sort them correctly. So looks like can't use "over window" until it allows multiple columns in "orderBy". I am exploring option of creating my own window as you suggested to be more flexible.

Gagan

On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

> But I also have a requirement for event time based sliding window aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently, sliding windows don't support early fire, i.e., only output results when event time reaches the end of the window. Once window fires, the window state will be cleared and late data belonging to this window will be ignored. In order to wait for the late event, you can extract watermark with an offset from the timestamp. For example, make watermark = timestamp - 5min. 

If event time and early fire is a strong requirement in your scenarios, you can probably use an over window[1] to solve your problem, say an over window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a DataStream job in which define your own window logic[2].

Best, Hequn




On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal <[hidden email]> wrote:
Thank you guys. It's great to hear multiple solutions to achieve this. I understand that records once emitted to Kafka can not be deleted and that's acceptable for our use case as last updated value should always be correct. However as I understand most of these solutions will work for global aggregation which was asked in original question. But I also have requirement for event time based sliding window aggregation where same order count needs to be maintained for past x hours window (sliding at say every 5 minutes). Is it possible to achieve with Table Api / SQL at the moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) -> Success (t2) happens in same window w1. In this case once window aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this case old(and already emitted)  window (w1) needs to be re-emitted with decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in window w1 and Success (t2) in w2. I think in this case it may not require to re-emit old window w1 as it represents pending count till that window time (w1) which is still valid as record moved to Success in next window w2 (based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <[hidden email]> wrote:
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Query on retract stream

Gagan Agrawal
Thanks Hequn for sharing those details. Looking forward for Blink integration.
I have one doubt around one of your earlier statements

> Also, currently, the window doesn't have the ability to handle retraction messages

When we use multi window (as you suggested), it is able to handle updates. So what does this statement really mean? Does that mean using multi window is just a work around as with single window it's not able to handle retraction messages?

Also wanted to confirm if tumbling window in Table/SQL api can handle late data (i.e data arriving after window has closed), do we have something similar to Datastream apj which has allowedLateness feature? You already mentioned that for sliding window it can not handle late data. But does that apply for Tumbling window as well?

One of the challenge in using unbounded aggregates in Table api is around state retention. As I understand only way to clear old state is via query config on idleTimeRetention. However it's a global parameter and not per aggregate parameter. So in my flink job, if I want mix of minute, hourly and daily aggregates, I will have to keep idleTimeRetention to minimum of day which means all minute level aggregations will also exist for entire day and hence would lead to increase in state size.

Gagan

Gagan



On Sun, Jan 27, 2019 at 9:42 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

Besides the eventime and proctime difference, there is another difference between the two ways. The window aggregate on bounded data, while unbounded aggregate on unbounded data, i.e., the new coming data can update a very old data. 

As for the performance, I think the two ways may have no big difference in current Flink version. Maybe you can run some tests between them on your own scenarios if both of them can solve your problem. FYI: There is a nice discussion[1] raised by Timo recently. Once Blink is merged into Flink, the unbounded aggregate will be much faster than the window.

Best,
Hequn



On Sat, Jan 26, 2019 at 4:11 PM Gagan Agrawal <[hidden email]> wrote:
Thanks Hequn for suggested solutions and I think this should really work and will give it a try. As I understand First solution  of using multiple windows will be good for those scenarios where I want output to be generated post window is materialized (i.e. watermark reaches end of window). And second will be good if I want it to be fired on per event basis (i.e no watermarking). Apart from this, do you see any difference from performance perspective in choosing between the two or both should be equally performant?

Gagan

On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

Time attribute fields will be materialized by the unbounded groupby. Also, currently, the window doesn't have the ability to handle retraction messages. I see two ways to solve the problem.

- Use multi-window.  The first window performs lastValue, the second performs count.
- Use two non-window aggregates. In this case, you don't have to change anything for the first aggregate. For the second one, you can group by an hour field and perform count(). The code looks like:

SELECT userId,
         count(orderId)
FROM 
    (SELECT orderId,
         getHour(orderTime) as myHour,
         lastValue(userId) AS userId,
         lastValue(status) AS status
    FROM orders
    GROUP BY  orderId, orderTime)
WHERE status='PENDING'
GROUP BY myHour, userId 

Best,
Hequn

 


On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal <[hidden email]> wrote:
Based on the suggestions in this mail thread, I tried out few experiments on upsert stream with flink 1.7.1 and here is the issue I am facing with window stream.

1. Global Pending order count. 
Following query works fine and it's able to handle updates as per original requirement.

select userId, count(orderId) from
(select orderId, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId)
where status='PENDING' group by userId

2. Last 1 Hour tumbling window count (Append stream)
Though following query doesn't handle upsert stream, I just tried to make sure time column is working fine. This is working, but as expected, it doesn't handle updates on orderId.

select userId, count(orderId) from orders
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

3. Last 1 Hour tumbling window count (With upsert stream)
Now I tried combination of above two where input stream is converted to upsert stream (via lastValue aggregate function) and then Pending count needs to be calculated in last 1 hour window.

select userId, count(orderId) from
(select orderId, orderTime, lastValue(userId) as userId, lastValue(status) as status from orders group by orderId, orderTime)
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

This one gives me following error. Is this because I have added orderTime in group by/select clause and hence it's time characteristics have changed? What is the workaround here as without adding orderTime, I can not perform window aggregation on upsert stream.

[error] Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
[error]         at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
[error]         at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
[error]         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
[error]         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
[error]         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
[error]         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
[error]         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
[error]         at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
[error]         at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
[error]         at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
[error]         at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)

Gagan

On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal <[hidden email]> wrote:
Thanks Hequn for your response. I initially thought of trying out "over window" clause, however as per documentation there seems to be limitation in "orderBy" clause where it allows only single time event/processing time attribute. Whereas in my case events are getting generated from mysql bin log where I have seen multiple event updates getting generated with same timestamp (may be because they are part of same transaction) and hence will need bin log offset along with timestamp to be able to sort them correctly. So looks like can't use "over window" until it allows multiple columns in "orderBy". I am exploring option of creating my own window as you suggested to be more flexible.

Gagan

On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng <[hidden email]> wrote:
Hi Gagan,

> But I also have a requirement for event time based sliding window aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently, sliding windows don't support early fire, i.e., only output results when event time reaches the end of the window. Once window fires, the window state will be cleared and late data belonging to this window will be ignored. In order to wait for the late event, you can extract watermark with an offset from the timestamp. For example, make watermark = timestamp - 5min. 

If event time and early fire is a strong requirement in your scenarios, you can probably use an over window[1] to solve your problem, say an over window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a DataStream job in which define your own window logic[2].

Best, Hequn




On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal <[hidden email]> wrote:
Thank you guys. It's great to hear multiple solutions to achieve this. I understand that records once emitted to Kafka can not be deleted and that's acceptable for our use case as last updated value should always be correct. However as I understand most of these solutions will work for global aggregation which was asked in original question. But I also have requirement for event time based sliding window aggregation where same order count needs to be maintained for past x hours window (sliding at say every 5 minutes). Is it possible to achieve with Table Api / SQL at the moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) -> Success (t2) happens in same window w1. In this case once window aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this case old(and already emitted)  window (w1) needs to be re-emitted with decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in window w1 and Success (t2) in w2. I think in this case it may not require to re-emit old window w1 as it represents pending count till that window time (w1) which is still valid as record moved to Success in next window w2 (based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <[hidden email]> wrote:
@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using `CASE` expression. 

One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time.

Piotrek

On 21 Jan 2019, at 15:10, Jeff Zhang <[hidden email]> wrote:

I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.

Piotr Nowojski <[hidden email]> 于2019年1月21日周一 下午8:44写道:
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <[hidden email]> wrote:

Hi,
I have a requirement and need to understand if same can be achieved with Flink retract stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where orderId is unique and hence any change in same orderId updates previous value as below

Changelog Event Stream

user, order, status, event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for respective order ids)

What I need is to maintain count of "Pending" orders against a user and if they go beyond configured threshold, then push that user and pending count to Kafka. Here there can be multiple updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases there may not be any change in status but we may still get a row (may be due to some other attribute update which we are not concerned about). So is it possible to have running count in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly for user u2, at time t6, there was no change in running count as there was no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)

As I understand may be retract stream can achieve this. However I am not sure how. Any samples around this would be of great help.

Gagan




--
Best Regards

Jeff Zhang