Rowtime

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Rowtime

Gregory Fee
Hello! I have found that even though I am processing using event time and I provide an event time for all my events that the events produced in a RetractStream I create from a Table do not have timestamps. That is to say that I put a ProcessFunction on the RetractStream and ctx.timestamp() always returns null. I went a step further and defined the table to include a rowtime column, but that doesn't seem to make any difference. My SQL on the Table is roughly just:

select user, count(item) from table group by user

Am I missing something? Is there any way to get a reasonable event timestamp on the events in the retract stream?

--
<form method="post" target="_blank" onsubmit="try {return window.confirm(&quot;You are submitting information to an external page.\nAre you sure?&quot;);} catch (e) {return false;}">
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:&#39;Helvetica Neue&#39;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft
Reply | Threaded
Open this post in threaded view
|

Re: Rowtime

Fabian Hueske-2
Hi Gregory,

Event-time timestamps are handled a bit differently in Flink's SQL compared to the DataStream API.
In the DataStream API, timestamps are hidden from the user and implicitly used for time-based operations such as windows.
In SQL, the query semantics cannot depend on hidden fields. Therefore, all time fields (event-time but also processing time) must be part of the schema of a table.
Internally, the SQL operators remove the internal DataStream timestamp and move it into the record.

When a Table is converted into a DataStream, one of three things may happen:
1) the Table has no event-time field: the resulting DataStream will not have a record timestamp. This is your case.
2) the Table has exactly one event-time field: the record timestamp of the result DataStream will be set from the event-time field
3) the Table has more than one event-time field: the conversion fails and you need to cast all but one event-time fields into regular TIMESTAMP fields.

So usually, you would need to define a table with an event-time attribute [1] and just forward it in the SELECT clause.

However, your case is a bit more complicated.
Your query is performing a non-windowed aggregation, which is not a time-based operation but causes updates/retractions.
Therefore, it is a bit more tricky to preserve the event-time timestamp because we must ensure that they are still aligned with the watermarks.
There would be two queries that would ensure watermark alignment:
- SELECT user, COUNT(*), MAX(rowtime) FROM t GROUP BY user;
- SELECT user, COUNT(*), LAST_VAL(rowtime) FROM t GROUP BY user;

These queries would forward the maximum (or last) event-time timestamp (rowtime) to the result table.
However, none of these work in the current version or upcoming version of Flink.
We also need to think about how timestamps would interact with retractions, because retractions should not be treated as late records.

Best, Fabian


2018-03-23 0:02 GMT+01:00 Gregory Fee <[hidden email]>:
Hello! I have found that even though I am processing using event time and I provide an event time for all my events that the events produced in a RetractStream I create from a Table do not have timestamps. That is to say that I put a ProcessFunction on the RetractStream and ctx.timestamp() always returns null. I went a step further and defined the table to include a rowtime column, but that doesn't seem to make any difference. My SQL on the Table is roughly just:

select user, count(item) from table group by user

Am I missing something? Is there any way to get a reasonable event timestamp on the events in the retract stream?

--
Gregory Fee
Engineer
<a href="tel:+14258304734" style="font-size:13px;color:#494f50;font-family:&#39;Helvetica Neue&#39;,Helvetica,Arial,sans-serif;text-decoration:none" target="_blank">425.830.4734
Lyft