Define rowtime on intermediate table field

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Define rowtime on intermediate table field

Sumeet Malhotra
Hi,

My use case involves reading raw data records from Kafka and processing them. The records are coming from a database, where a periodic job reads new rows, packages them into a single JSON object (as described below) and writes the entire record to Kafka.

{
    'id': 'some_id',
    'key_a': 'value_a',
    'key_b': 'value_b',
    'result': {
        'columns': [
            'col_a',
            'col_b',
            'col_c',
            'col_d'
        ],
        'rows': [
            ['2021-05-04T05:23:13.953610Z', '655361', '8013', '0'],
            ['2021-05-04T05:23:13.953610Z', '655362', '4000', '456'],
            ['2021-05-04T05:23:13.953610Z', '655363', '20000', '562'],
            ...
            ...
        ]
    }
}

As can be seen, the row time is actually embedded in the `result` object.

What I'm doing at the moment is to run this data through a user defined table function, which parses the `result` object as a string, and emits multiple rows that include the timestamp field. This is working fine.

In the next step, I would want to perform windowing on this transformed data. That requires defining the event time attribute along with the watermark. As I understand, this can be done either during the initial table DDL definition or during conversion to a datastream.

Since I extract the timestamp value only after reading from Kafka, how can I define an event time attribute on the intermediate table that's basically a result of the user defined table function?

The only solution I can think of at the moment, is to write the intermediate table back to Kafka, and then create a new consumer that reads from Kafka, where I can define the event time attribute as part of its DDL. This most likely won't be good performance wise. I'm looking at any other way, I can define event time on results of my user defined table function?

Thanks in advance,
Sumeet

Reply | Threaded
Open this post in threaded view
|

Re: Define rowtime on intermediate table field

Yun Gao
Hi Sumeet,

I think you might first convert the table back to the DataStream [1],
then define the timestamp and watermark with `assignTimestampsAndWatermarks(...)`, 
and then convert it back to table[2].

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/#convert-a-table-into-a-datastream
[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#during-datastream-to-table-conversion


------------------Original Mail ------------------
Sender:Sumeet Malhotra <[hidden email]>
Send Date:Tue May 4 16:32:10 2021
Recipients:user <[hidden email]>
Subject:Define rowtime on intermediate table field
Hi,

My use case involves reading raw data records from Kafka and processing them. The records are coming from a database, where a periodic job reads new rows, packages them into a single JSON object (as described below) and writes the entire record to Kafka.

{
    'id': 'some_id',
    'key_a': 'value_a',
    'key_b': 'value_b',
    'result': {
        'columns': [
            'col_a',
            'col_b',
            'col_c',
            'col_d'
        ],
        'rows': [
            ['2021-05-04T05:23:13.953610Z', '655361', '8013', '0'],
            ['2021-05-04T05:23:13.953610Z', '655362', '4000', '456'],
            ['2021-05-04T05:23:13.953610Z', '655363', '20000', '562'],
            ...
            ...
        ]
    }
}

As can be seen, the row time is actually embedded in the `result` object.

What I'm doing at the moment is to run this data through a user defined table function, which parses the `result` object as a string, and emits multiple rows that include the timestamp field. This is working fine.

In the next step, I would want to perform windowing on this transformed data. That requires defining the event time attribute along with the watermark. As I understand, this can be done either during the initial table DDL definition or during conversion to a datastream.

Since I extract the timestamp value only after reading from Kafka, how can I define an event time attribute on the intermediate table that's basically a result of the user defined table function?

The only solution I can think of at the moment, is to write the intermediate table back to Kafka, and then create a new consumer that reads from Kafka, where I can define the event time attribute as part of its DDL. This most likely won't be good performance wise. I'm looking at any other way, I can define event time on results of my user defined table function?

Thanks in advance,
Sumeet