Hi, My use case involves reading raw data records from Kafka and processing them. The records are coming from a database, where a periodic job reads new rows, packages them into a single JSON object (as described below) and writes the entire record to Kafka. { 'id': 'some_id', 'key_a': 'value_a', 'key_b': 'value_b', 'result': { 'columns': [ 'col_a', 'col_b', 'col_c', 'col_d' ], 'rows': [ ['2021-05-04T05:23:13.953610Z', '655361', '8013', '0'], ['2021-05-04T05:23:13.953610Z', '655362', '4000', '456'], ['2021-05-04T05:23:13.953610Z', '655363', '20000', '562'], ... ... ] } } As can be seen, the row time is actually embedded in the `result` object. What I'm doing at the moment is to run this data through a user defined table function, which parses the `result` object as a string, and emits multiple rows that include the timestamp field. This is working fine. In the next step, I would want to perform windowing on this transformed data. That requires defining the event time attribute along with the watermark. As I understand, this can be done either during the initial table DDL definition or during conversion to a datastream. Since I extract the timestamp value only after reading from Kafka, how can I define an event time attribute on the intermediate table that's basically a result of the user defined table function? The only solution I can think of at the moment, is to write the intermediate table back to Kafka, and then create a new consumer that reads from Kafka, where I can define the event time attribute as part of its DDL. This most likely won't be good performance wise. I'm looking at any other way, I can define event time on results of my user defined table function? Thanks in advance, Sumeet |
Hi Sumeet, I think you might first convert the table back to the DataStream [1], then define the timestamp and watermark with `assignTimestampsAndWatermarks(...)`, and then convert it back to table[2]. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/#convert-a-table-into-a-datastream [2] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#during-datastream-to-table-conversion
|
Free forum by Nabble | Edit this page |