Expressing Flink array aggregation using Table / SQL API

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Expressing Flink array aggregation using Table / SQL API

Piyush Narang

Hi folks,

 

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.

My data looks something like this:

userId, clientId, eventType, timestamp, dataField

 

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:

SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated

FROM my_kafka_stream_table

GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

 

SELECT userId, COLLECT(client_custom_aggregated)

FROM

(

  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated

  FROM my_kafka_stream_table

  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

) GROUP BY userId

 

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

 

Thanks,

 

-- Piyush

 

Reply | Threaded
Open this post in threaded view
|

Re: Expressing Flink array aggregation using Table / SQL API

Kurt Young
Hi Piyush, 

Could you try to add clientId into your aggregate function, and to track the map of <clientId, your_original_aggregation> inside your new aggregate function, and assemble what ever result when emit. 
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField) 
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <[hidden email]> wrote:

Hi folks,

 

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.

My data looks something like this:

userId, clientId, eventType, timestamp, dataField

 

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:

SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated

FROM my_kafka_stream_table

GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

 

SELECT userId, COLLECT(client_custom_aggregated)

FROM

(

  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated

  FROM my_kafka_stream_table

  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

) GROUP BY userId

 

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

 

Thanks,

 

-- Piyush

 

Reply | Threaded
Open this post in threaded view
|

Re: Expressing Flink array aggregation using Table / SQL API

Piyush Narang

Thanks for getting back Kurt. Yeah this might be an option to try out. I was hoping there would be a way to express this directly in the SQL though .

 

-- Piyush

 

 

From: Kurt Young <[hidden email]>
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

 

Hi Piyush, 

 

Could you try to add clientId into your aggregate function, and to track the map of <clientId, your_original_aggregation> inside your new aggregate function, and assemble what ever result when emit. 

The SQL will looks like:

 

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField) 

FROM my_kafka_stream_table

GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

Kurt

 

 

On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <[hidden email]> wrote:

Hi folks,

 

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.

My data looks something like this:

userId, clientId, eventType, timestamp, dataField

 

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:

SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated

FROM my_kafka_stream_table

GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

 

SELECT userId, COLLECT(client_custom_aggregated)

FROM

(

  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated

  FROM my_kafka_stream_table

  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

) GROUP BY userId

 

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

 

Thanks,

 

-- Piyush

 

Reply | Threaded
Open this post in threaded view
|

Re: Expressing Flink array aggregation using Table / SQL API

Kurt Young
Hi Piyush, 

I think your second sql is correct, but the problem you have encountered is the outside aggregation (GROUP BY userId & COLLECT(client_custom_aggregated)) will 
emit result immediately when receiving results from the inner aggregation.  Hence Flink need the sink to 
1. either has ability to retract the former emitted result, the sink should be a `RetractStreamTableSink` or
2. the sink has something like primary key and can update result by key. In your case, userId should be the key.

I think you are trying to emit the result to a `AppendStreamTableSink`, so here is why you see error like that. 

Best,
Kurt


On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang <[hidden email]> wrote:

Thanks for getting back Kurt. Yeah this might be an option to try out. I was hoping there would be a way to express this directly in the SQL though .

 

-- Piyush

 

 

From: Kurt Young <[hidden email]>
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

 

Hi Piyush, 

 

Could you try to add clientId into your aggregate function, and to track the map of <clientId, your_original_aggregation> inside your new aggregate function, and assemble what ever result when emit. 

The SQL will looks like:

 

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField) 

FROM my_kafka_stream_table

GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

Kurt

 

 

On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <[hidden email]> wrote:

Hi folks,

 

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.

My data looks something like this:

userId, clientId, eventType, timestamp, dataField

 

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:

SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated

FROM my_kafka_stream_table

GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

 

SELECT userId, COLLECT(client_custom_aggregated)

FROM

(

  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated

  FROM my_kafka_stream_table

  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

) GROUP BY userId

 

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

 

Thanks,

 

-- Piyush

 

Reply | Threaded
Open this post in threaded view
|

Re: Expressing Flink array aggregation using Table / SQL API

Piyush Narang

Hi Kurt,

 

Thanks for getting back and explaining this. The behavior in this case makes more sense now after your explanation + reading the dynamic tables article. I was able to hook up the Scoped aggregation like you suggested so I have a workaround for now. I guess the part that I’m trying to figure out is if there’s any way to express the query I had to be able to sink to an append sink (apart from this custom aggregation). I tried including the time window in the outer query as well but I was running into errors there. Or would you typically in such scenarios go the route of either having a retractable sink / sink that can update partial results by key?

 

Thanks,

 

-- Piyush

 

 

From: Kurt Young <[hidden email]>
Date: Tuesday, March 12, 2019 at 11:51 PM
To: Piyush Narang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

 

Hi Piyush, 

 

I think your second sql is correct, but the problem you have encountered is the outside aggregation (GROUP BY userId & COLLECT(client_custom_aggregated)) will 

emit result immediately when receiving results from the inner aggregation.  Hence Flink need the sink to 

1. either has ability to retract the former emitted result, the sink should be a `RetractStreamTableSink` or

2. the sink has something like primary key and can update result by key. In your case, userId should be the key.

 

I think you are trying to emit the result to a `AppendStreamTableSink`, so here is why you see error like that. 

 

Best,

Kurt

 

 

On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang <[hidden email]> wrote:

Thanks for getting back Kurt. Yeah this might be an option to try out. I was hoping there would be a way to express this directly in the SQL though .

 

-- Piyush

 

 

From: Kurt Young <[hidden email]>
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

 

Hi Piyush, 

 

Could you try to add clientId into your aggregate function, and to track the map of <clientId, your_original_aggregation> inside your new aggregate function, and assemble what ever result when emit. 

The SQL will looks like:

 

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField) 

FROM my_kafka_stream_table

GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

Kurt

 

 

On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <[hidden email]> wrote:

Hi folks,

 

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.

My data looks something like this:

userId, clientId, eventType, timestamp, dataField

 

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:

SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated

FROM my_kafka_stream_table

GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

 

SELECT userId, COLLECT(client_custom_aggregated)

FROM

(

  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated

  FROM my_kafka_stream_table

  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

) GROUP BY userId

 

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

 

Thanks,

 

-- Piyush

 

Reply | Threaded
Open this post in threaded view
|

Re: Expressing Flink array aggregation using Table / SQL API

Kurt Young
Another choice we used before is we create a Retract/Upsert sink directly on current kafka sink, and provide 
a choice to just drop the "delete" message it received. You could do the dedup work in the following jobs who 
will consume this result.

Best,
Kurt


On Fri, Mar 15, 2019 at 9:44 PM Piyush Narang <[hidden email]> wrote:

Hi Kurt,

 

Thanks for getting back and explaining this. The behavior in this case makes more sense now after your explanation + reading the dynamic tables article. I was able to hook up the Scoped aggregation like you suggested so I have a workaround for now. I guess the part that I’m trying to figure out is if there’s any way to express the query I had to be able to sink to an append sink (apart from this custom aggregation). I tried including the time window in the outer query as well but I was running into errors there. Or would you typically in such scenarios go the route of either having a retractable sink / sink that can update partial results by key?

 

Thanks,

 

-- Piyush

 

 

From: Kurt Young <[hidden email]>
Date: Tuesday, March 12, 2019 at 11:51 PM
To: Piyush Narang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

 

Hi Piyush, 

 

I think your second sql is correct, but the problem you have encountered is the outside aggregation (GROUP BY userId & COLLECT(client_custom_aggregated)) will 

emit result immediately when receiving results from the inner aggregation.  Hence Flink need the sink to 

1. either has ability to retract the former emitted result, the sink should be a `RetractStreamTableSink` or

2. the sink has something like primary key and can update result by key. In your case, userId should be the key.

 

I think you are trying to emit the result to a `AppendStreamTableSink`, so here is why you see error like that. 

 

Best,

Kurt

 

 

On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang <[hidden email]> wrote:

Thanks for getting back Kurt. Yeah this might be an option to try out. I was hoping there would be a way to express this directly in the SQL though .

 

-- Piyush

 

 

From: Kurt Young <[hidden email]>
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

 

Hi Piyush, 

 

Could you try to add clientId into your aggregate function, and to track the map of <clientId, your_original_aggregation> inside your new aggregate function, and assemble what ever result when emit. 

The SQL will looks like:

 

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField) 

FROM my_kafka_stream_table

GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

Kurt

 

 

On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <[hidden email]> wrote:

Hi folks,

 

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.

My data looks something like this:

userId, clientId, eventType, timestamp, dataField

 

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:

SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated

FROM my_kafka_stream_table

GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

 

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

 

SELECT userId, COLLECT(client_custom_aggregated)

FROM

(

  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated

  FROM my_kafka_stream_table

  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

) GROUP BY userId

 

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

 

Thanks,

 

-- Piyush