How to output a Table to Kafka?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to output a Table to Kafka?

chi ma
Hi,

I'm a newbie to Flink. I'm trying to load data from HDFS and analyze it using Flink Table APIs.

I created a TableSource, and registered it into the StreamTableEnvironment as a table, and ran a SQL on that table through streamTableEnvironment.sqlQuery, and finally I called writeToSink to write the result to a Kafka010JsonTableSink.

But, I got an exception, "Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes."

I've googled it. The probable cause is that I put a "group by" clause in the SQL statement, which makes the result table to be a retract table.

And, if I pass a SQL statement without "group by" clause to sqlQuery, everything gets ok, and I can see the result in the Kafka topic.

So, my questions are:
   1. What is the classical usage for outputing a retract table?
   2. Is there a unified way to process output for both append-only and not append-only table?

Thanks.
Best Regards~
Reply | Threaded
Open this post in threaded view
|

Re: How to output a Table to Kafka?

Hequn Cheng
Hi chi ma,

A query with "group by" produces an updated table. Both UpsertStreamTableSink and RetractStreamTableSink can be used to emit an updated table. The main difference between UpsertStreamTableSink and RetractStreamTableSink is RetractStreamTableSink will encode an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row, while for UpsertStreamTableSink UPDATE changes are encoded with a single message and hence more efficient. More details can be found here[1][2].

Both UpsertStreamTableSink and RetractStreamTableSink can be used to emit an append-only table or an updated table. While AppendStreamTableSink can only be used to emit an append-only table.



On Mon, Apr 9, 2018 at 2:43 PM, chi ma <[hidden email]> wrote:
Hi,

I'm a newbie to Flink. I'm trying to load data from HDFS and analyze it using Flink Table APIs.

I created a TableSource, and registered it into the StreamTableEnvironment as a table, and ran a SQL on that table through streamTableEnvironment.sqlQuery, and finally I called writeToSink to write the result to a Kafka010JsonTableSink.

But, I got an exception, "Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes."

I've googled it. The probable cause is that I put a "group by" clause in the SQL statement, which makes the result table to be a retract table.

And, if I pass a SQL statement without "group by" clause to sqlQuery, everything gets ok, and I can see the result in the Kafka topic.

So, my questions are:
   1. What is the classical usage for outputing a retract table?
   2. Is there a unified way to process output for both append-only and not append-only table?

Thanks.
Best Regards~