UpsertStreamTableSink for Aggregate Only Query

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

UpsertStreamTableSink for Aggregate Only Query

Satyam Shekhar
Hello,

I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output of Flink job for such a query is that there is always exactly 1 row in the result set (or that the number of rows does not fall to 0 due to retractions for previous output).  In other words, I need upsert "like" semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that this condition is accounted for in the implementation, however, a pipeline with above query writing to a concrete UpsertStreamTableSink fails with the following error  - "UpsertStreamTableSink requires that Table has" + " a full primary keys if it is updated." Here are the relevant comments from UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single (updated) record. If the table does not have a key and is append-only, the keys attribute is null.

@param keys the field names of the table's keys, an empty array if the table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed failure and does not match the comment about "empty key array if the table consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such aggregate only queries? Or is my interpretation of the code and comment wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for solving this use-case such that the client never observes an empty result set for the output of this query?

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: UpsertStreamTableSink for Aggregate Only Query

Arvid Heise-3
Hi Satyam,

you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <[hidden email]> wrote:
Hello,

I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output of Flink job for such a query is that there is always exactly 1 row in the result set (or that the number of rows does not fall to 0 due to retractions for previous output).  In other words, I need upsert "like" semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that this condition is accounted for in the implementation, however, a pipeline with above query writing to a concrete UpsertStreamTableSink fails with the following error  - "UpsertStreamTableSink requires that Table has" + " a full primary keys if it is updated." Here are the relevant comments from UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single (updated) record. If the table does not have a key and is append-only, the keys attribute is null.

@param keys the field names of the table's keys, an empty array if the table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed failure and does not match the comment about "empty key array if the table consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such aggregate only queries? Or is my interpretation of the code and comment wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for solving this use-case such that the client never observes an empty result set for the output of this query?

Regards,
Satyam


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: UpsertStreamTableSink for Aggregate Only Query

Satyam Shekhar
Hey Arvid,

Thanks for the reply. 

As you suggested, rewriting the query to add a dummy output and group by the clause - "select 1, sum(revenue) from lineorder group by 1" does add a unique key column to the output, and the pipeline succeeds.

However, the application may get arbitrary SQL from the upstream server. This makes the solution tricky - I'd have to change the query to add dummy grouping key for all grouping nodes in the query and projection node to drop the dummy key. I can try to account for this upstream (in query generation layer) but it would prefer to have it solved within the execution engine itself. 

Regards,
Satyam

On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <[hidden email]> wrote:
Hi Satyam,

you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <[hidden email]> wrote:
Hello,

I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output of Flink job for such a query is that there is always exactly 1 row in the result set (or that the number of rows does not fall to 0 due to retractions for previous output).  In other words, I need upsert "like" semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that this condition is accounted for in the implementation, however, a pipeline with above query writing to a concrete UpsertStreamTableSink fails with the following error  - "UpsertStreamTableSink requires that Table has" + " a full primary keys if it is updated." Here are the relevant comments from UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single (updated) record. If the table does not have a key and is append-only, the keys attribute is null.

@param keys the field names of the table's keys, an empty array if the table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed failure and does not match the comment about "empty key array if the table consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such aggregate only queries? Or is my interpretation of the code and comment wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for solving this use-case such that the client never observes an empty result set for the output of this query?

Regards,
Satyam


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: UpsertStreamTableSink for Aggregate Only Query

Arvid Heise-3
Instead of changing the query, I used to embed the query in a larger context for similar works.

So if you get an arbitrary query X which produces exactly one result (e.g. X = select sum(revenue) from lineorder group by 1) then you can craft a query where you add a dummy pk to the result.

Table original = env.sqlQuery(X);
Table withDummy = original.select("'dummy' as pk, *');

On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <[hidden email]> wrote:
Hey Arvid,

Thanks for the reply. 

As you suggested, rewriting the query to add a dummy output and group by the clause - "select 1, sum(revenue) from lineorder group by 1" does add a unique key column to the output, and the pipeline succeeds.

However, the application may get arbitrary SQL from the upstream server. This makes the solution tricky - I'd have to change the query to add dummy grouping key for all grouping nodes in the query and projection node to drop the dummy key. I can try to account for this upstream (in query generation layer) but it would prefer to have it solved within the execution engine itself. 

Regards,
Satyam

On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <[hidden email]> wrote:
Hi Satyam,

you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <[hidden email]> wrote:
Hello,

I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output of Flink job for such a query is that there is always exactly 1 row in the result set (or that the number of rows does not fall to 0 due to retractions for previous output).  In other words, I need upsert "like" semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that this condition is accounted for in the implementation, however, a pipeline with above query writing to a concrete UpsertStreamTableSink fails with the following error  - "UpsertStreamTableSink requires that Table has" + " a full primary keys if it is updated." Here are the relevant comments from UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single (updated) record. If the table does not have a key and is append-only, the keys attribute is null.

@param keys the field names of the table's keys, an empty array if the table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed failure and does not match the comment about "empty key array if the table consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such aggregate only queries? Or is my interpretation of the code and comment wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for solving this use-case such that the client never observes an empty result set for the output of this query?

Regards,
Satyam


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: UpsertStreamTableSink for Aggregate Only Query

Jark Wu-3
Hi Satyam, 

Currently, `UpsertStreamTableSink` requires the query to contain a primary key, and the key will be set to `UpsertStreamTableSink#setKeyFields`. 
If there is no primary key in the query, an error will be thrown as you can see. 

It should work for all the group by queries (if no projection on the group by after the aggregation).
Global aggregation is special, it doesn't have a primary key. But an upsert sink requires a primary key, otherwise it doesn't know which row to update. 
How would you write such a result into an external database if no primary key? Will you write them in append fashion, or remove-insert fashion? 

Best,
Jark


On Sat, 6 Jun 2020 at 04:32, Arvid Heise <[hidden email]> wrote:
Instead of changing the query, I used to embed the query in a larger context for similar works.

So if you get an arbitrary query X which produces exactly one result (e.g. X = select sum(revenue) from lineorder group by 1) then you can craft a query where you add a dummy pk to the result.

Table original = env.sqlQuery(X);
Table withDummy = original.select("'dummy' as pk, *');

On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <[hidden email]> wrote:
Hey Arvid,

Thanks for the reply. 

As you suggested, rewriting the query to add a dummy output and group by the clause - "select 1, sum(revenue) from lineorder group by 1" does add a unique key column to the output, and the pipeline succeeds.

However, the application may get arbitrary SQL from the upstream server. This makes the solution tricky - I'd have to change the query to add dummy grouping key for all grouping nodes in the query and projection node to drop the dummy key. I can try to account for this upstream (in query generation layer) but it would prefer to have it solved within the execution engine itself. 

Regards,
Satyam

On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <[hidden email]> wrote:
Hi Satyam,

you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <[hidden email]> wrote:
Hello,

I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output of Flink job for such a query is that there is always exactly 1 row in the result set (or that the number of rows does not fall to 0 due to retractions for previous output).  In other words, I need upsert "like" semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that this condition is accounted for in the implementation, however, a pipeline with above query writing to a concrete UpsertStreamTableSink fails with the following error  - "UpsertStreamTableSink requires that Table has" + " a full primary keys if it is updated." Here are the relevant comments from UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single (updated) record. If the table does not have a key and is append-only, the keys attribute is null.

@param keys the field names of the table's keys, an empty array if the table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed failure and does not match the comment about "empty key array if the table consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such aggregate only queries? Or is my interpretation of the code and comment wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for solving this use-case such that the client never observes an empty result set for the output of this query?

Regards,
Satyam


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: UpsertStreamTableSink for Aggregate Only Query

Satyam Shekhar
Hi Jark,

I wish to atomically update the destination with remove-insert. To pick that strategy, I need some "hint" from Flink that the output is a global aggregation with no grouping key, and that appends should overwrite the previous value.

I am also exploring handling the issue in the upstream server (in query generation layer) where I have this knowledge based on the context (similar to what Arvid suggested). I may be able to get around this problem by handling it upstream.

Regards,
Satyam

On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <[hidden email]> wrote:
Hi Satyam, 

Currently, `UpsertStreamTableSink` requires the query to contain a primary key, and the key will be set to `UpsertStreamTableSink#setKeyFields`. 
If there is no primary key in the query, an error will be thrown as you can see. 

It should work for all the group by queries (if no projection on the group by after the aggregation).
Global aggregation is special, it doesn't have a primary key. But an upsert sink requires a primary key, otherwise it doesn't know which row to update. 
How would you write such a result into an external database if no primary key? Will you write them in append fashion, or remove-insert fashion? 

Best,
Jark


On Sat, 6 Jun 2020 at 04:32, Arvid Heise <[hidden email]> wrote:
Instead of changing the query, I used to embed the query in a larger context for similar works.

So if you get an arbitrary query X which produces exactly one result (e.g. X = select sum(revenue) from lineorder group by 1) then you can craft a query where you add a dummy pk to the result.

Table original = env.sqlQuery(X);
Table withDummy = original.select("'dummy' as pk, *');

On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <[hidden email]> wrote:
Hey Arvid,

Thanks for the reply. 

As you suggested, rewriting the query to add a dummy output and group by the clause - "select 1, sum(revenue) from lineorder group by 1" does add a unique key column to the output, and the pipeline succeeds.

However, the application may get arbitrary SQL from the upstream server. This makes the solution tricky - I'd have to change the query to add dummy grouping key for all grouping nodes in the query and projection node to drop the dummy key. I can try to account for this upstream (in query generation layer) but it would prefer to have it solved within the execution engine itself. 

Regards,
Satyam

On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <[hidden email]> wrote:
Hi Satyam,

you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <[hidden email]> wrote:
Hello,

I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output of Flink job for such a query is that there is always exactly 1 row in the result set (or that the number of rows does not fall to 0 due to retractions for previous output).  In other words, I need upsert "like" semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that this condition is accounted for in the implementation, however, a pipeline with above query writing to a concrete UpsertStreamTableSink fails with the following error  - "UpsertStreamTableSink requires that Table has" + " a full primary keys if it is updated." Here are the relevant comments from UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single (updated) record. If the table does not have a key and is append-only, the keys attribute is null.

@param keys the field names of the table's keys, an empty array if the table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed failure and does not match the comment about "empty key array if the table consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such aggregate only queries? Or is my interpretation of the code and comment wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for solving this use-case such that the client never observes an empty result set for the output of this query?

Regards,
Satyam


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: UpsertStreamTableSink for Aggregate Only Query

Jark Wu-3
Hi Satyam,

I guess your destination database table doesn't have a primary key, right? 
If this is the case, I think maybe the upcoming 1.11 release with new sink interface (FLIP-95) can better resolve this. 

In the new sink interface:
- the primary key is always defined on Flink SQL DDL
- the planner will not infer or validate the primary key of the query anymore. 
- you can get either the query contains UPDATE/DELETE changes or is an INSERT only query vis the parameter of `DynamicTableSink#getChangelogMode(queryChangeMode)`

So if the `queryChangeMode` contains UPDATE changes, and DDL doesn't have any PK, you can set a flag in your sink to indicate it should work in "remove-insert" mode. 

Best,
Jark

On Mon, 8 Jun 2020 at 15:40, Satyam Shekhar <[hidden email]> wrote:
Hi Jark,

I wish to atomically update the destination with remove-insert. To pick that strategy, I need some "hint" from Flink that the output is a global aggregation with no grouping key, and that appends should overwrite the previous value.

I am also exploring handling the issue in the upstream server (in query generation layer) where I have this knowledge based on the context (similar to what Arvid suggested). I may be able to get around this problem by handling it upstream.

Regards,
Satyam

On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <[hidden email]> wrote:
Hi Satyam, 

Currently, `UpsertStreamTableSink` requires the query to contain a primary key, and the key will be set to `UpsertStreamTableSink#setKeyFields`. 
If there is no primary key in the query, an error will be thrown as you can see. 

It should work for all the group by queries (if no projection on the group by after the aggregation).
Global aggregation is special, it doesn't have a primary key. But an upsert sink requires a primary key, otherwise it doesn't know which row to update. 
How would you write such a result into an external database if no primary key? Will you write them in append fashion, or remove-insert fashion? 

Best,
Jark


On Sat, 6 Jun 2020 at 04:32, Arvid Heise <[hidden email]> wrote:
Instead of changing the query, I used to embed the query in a larger context for similar works.

So if you get an arbitrary query X which produces exactly one result (e.g. X = select sum(revenue) from lineorder group by 1) then you can craft a query where you add a dummy pk to the result.

Table original = env.sqlQuery(X);
Table withDummy = original.select("'dummy' as pk, *');

On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <[hidden email]> wrote:
Hey Arvid,

Thanks for the reply. 

As you suggested, rewriting the query to add a dummy output and group by the clause - "select 1, sum(revenue) from lineorder group by 1" does add a unique key column to the output, and the pipeline succeeds.

However, the application may get arbitrary SQL from the upstream server. This makes the solution tricky - I'd have to change the query to add dummy grouping key for all grouping nodes in the query and projection node to drop the dummy key. I can try to account for this upstream (in query generation layer) but it would prefer to have it solved within the execution engine itself. 

Regards,
Satyam

On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <[hidden email]> wrote:
Hi Satyam,

you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <[hidden email]> wrote:
Hello,

I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output of Flink job for such a query is that there is always exactly 1 row in the result set (or that the number of rows does not fall to 0 due to retractions for previous output).  In other words, I need upsert "like" semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that this condition is accounted for in the implementation, however, a pipeline with above query writing to a concrete UpsertStreamTableSink fails with the following error  - "UpsertStreamTableSink requires that Table has" + " a full primary keys if it is updated." Here are the relevant comments from UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single (updated) record. If the table does not have a key and is append-only, the keys attribute is null.

@param keys the field names of the table's keys, an empty array if the table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed failure and does not match the comment about "empty key array if the table consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such aggregate only queries? Or is my interpretation of the code and comment wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for solving this use-case such that the client never observes an empty result set for the output of this query?

Regards,
Satyam


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng