Question about timestamp of StreamRecord

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about timestamp of StreamRecord

DongyangYao
Hey guys,
As far as I know, the timestamp field of StreamRecord instance is the event time assgined by assignTimestampsAndWatermarks method if I have set the time characteristic of job to event time. My confusion is that the timestamp does not transfer through different operators as I expect. 
E.g., Map operator implemented by StreamMap class:
@Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }
Flat Map operator by StreamFlatMap:
@Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); userFunction.flatMap(element.getValue(), collector); }
Probably, Agg operator by StreamGroupedReduce:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();

if (currentValue != null) {
IN reduced = userFunction.reduce(currentValue, value);
values.update(reduced);
output.collect(element.replace(reduced));
} else {
values.update(value);
output.collect(element.replace(value));
}
}
Also, window operator by WindowOperator:
private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); }
All the operators above will deliver the timestamp to new StreamRecord instance. Then I just write a very simple SQL query, i.e., select a + 10, b, c from tb, however, when I get the result stream by toAppendStream or toRetractStream method, I find the timestamp of StreamRecord is null which is printed by Context.timestamp() in a ProcessFunction.

Best regard, 
Dongyang Yao


 

Reply | Threaded
Open this post in threaded view
|

Re: Question about timestamp of StreamRecord

Timo Walther
Hi,

in SQL event time is not part of the StreamRecord but a column in the
table. Thus, you need to extract it and specify the column name/location
when converting to Table API:

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion

When converting back to DataStream API, a single time column will be
converted back to StreamRecord if the SQL query preserves event-time.
For example, this is not the case for `toRetractStream`. Usually
toAppendStream and event-time play nicely together.

Regards,
Timo


On 18.01.21 13:48, DongyangYao wrote:

> Hey guys,
> As far as I know, the timestamp field of StreamRecord instance is the
> event time assgined by assignTimestampsAndWatermarks method if I have
> set the time characteristic of job to event time. My confusion is that
> the timestamp does not transfer through different operators as I expect.
> E.g., Map operator implemented by StreamMap class:
> @Override public void processElement(StreamRecord<IN> element) throws
> Exception {
> output.collect(element.replace(userFunction.map(element.getValue()))); }
> Flat Map operator by StreamFlatMap:
> @Override public void processElement(StreamRecord<IN> element) throws
> Exception { collector.setTimestamp(element);
> userFunction.flatMap(element.getValue(), collector); }
> Probably, Agg operator by StreamGroupedReduce:
> @Override
> public void processElement(StreamRecord<IN> element) throws Exception {
> IN value = element.getValue();
> IN currentValue = values.value();
>
> if (currentValue != null) {
> IN reduced = userFunction.reduce(currentValue, value);
> values.update(reduced);
> output.collect(element.replace(reduced));
> } else {
> values.update(value);
> output.collect(element.replace(value));
> }
> }
> Also, window operator by WindowOperator:
> private void emitWindowContents(W window, ACC contents) throws Exception
> { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
> processContext.window = window; userFunction.process(triggerContext.key,
> window, processContext, contents, timestampedCollector); }
> All the operators above will deliver the timestamp to new StreamRecord
> instance. Then I just write a very simple SQL query, i.e., select a +
> 10, b, c from tb, however, when I get the result stream by
> toAppendStream or toRetractStream method, I find the timestamp of
> StreamRecord is null which is printed by Context.timestamp() in a
> ProcessFunction.
>
> Best regard,
> Dongyang Yao
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Question about timestamp of StreamRecord

Timo Walther
Forget what I said before, I just tested the behavior and it seems there
is a bug in the conversion logic.

I opened https://issues.apache.org/jira/browse/FLINK-21013

Thanks for reaching out to us.

Regards,
Timo

On 18.01.21 15:37, Timo Walther wrote:

> Hi,
>
> in SQL event time is not part of the StreamRecord but a column in the
> table. Thus, you need to extract it and specify the column name/location
> when converting to Table API:
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion 
>
>
> When converting back to DataStream API, a single time column will be
> converted back to StreamRecord if the SQL query preserves event-time.
> For example, this is not the case for `toRetractStream`. Usually
> toAppendStream and event-time play nicely together.
>
> Regards,
> Timo
>
>
> On 18.01.21 13:48, DongyangYao wrote:
>> Hey guys,
>> As far as I know, the timestamp field of StreamRecord instance is the
>> event time assgined by assignTimestampsAndWatermarks method if I have
>> set the time characteristic of job to event time. My confusion is that
>> the timestamp does not transfer through different operators as I expect.
>> E.g., Map operator implemented by StreamMap class:
>> @Override public void processElement(StreamRecord<IN> element) throws
>> Exception {
>> output.collect(element.replace(userFunction.map(element.getValue()))); }
>> Flat Map operator by StreamFlatMap:
>> @Override public void processElement(StreamRecord<IN> element) throws
>> Exception { collector.setTimestamp(element);
>> userFunction.flatMap(element.getValue(), collector); }
>> Probably, Agg operator by StreamGroupedReduce:
>> @Override
>> public void processElement(StreamRecord<IN> element) throws Exception {
>> IN value = element.getValue();
>> IN currentValue = values.value();
>>
>> if (currentValue != null) {
>> IN reduced = userFunction.reduce(currentValue, value);
>> values.update(reduced);
>> output.collect(element.replace(reduced));
>> } else {
>> values.update(value);
>> output.collect(element.replace(value));
>> }
>> }
>> Also, window operator by WindowOperator:
>> private void emitWindowContents(W window, ACC contents) throws
>> Exception {
>> timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
>> processContext.window = window;
>> userFunction.process(triggerContext.key, window, processContext,
>> contents, timestampedCollector); }
>> All the operators above will deliver the timestamp to new StreamRecord
>> instance. Then I just write a very simple SQL query, i.e., select a +
>> 10, b, c from tb, however, when I get the result stream by
>> toAppendStream or toRetractStream method, I find the timestamp of
>> StreamRecord is null which is printed by Context.timestamp() in a
>> ProcessFunction.
>>
>> Best regard,
>> Dongyang Yao
>>
>>
>