Hey guys, As far as I know, the timestamp field of StreamRecord instance is the event time assgined by assignTimestampsAndWatermarks method if I have set the time characteristic of job to event time. My confusion is that the timestamp does not transfer through different operators as I expect. E.g., Map operator implemented by StreamMap class: @Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
} Flat Map operator by StreamFlatMap: @Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
} Probably, Agg operator by StreamGroupedReduce: @Override public void processElement(StreamRecord<IN> element) throws Exception { IN value = element.getValue(); IN currentValue = values.value(); if (currentValue != null) { IN reduced = userFunction.reduce(currentValue, value); values.update(reduced); output.collect(element.replace(reduced)); } else { values.update(value); output.collect(element.replace(value)); } } Also, window operator by WindowOperator: private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window;
userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
} All the operators above will deliver the timestamp to new StreamRecord instance. Then I just write a very simple SQL query, i.e., select a + 10, b, c from tb, however, when I get the result stream by toAppendStream or toRetractStream method, I find the timestamp of StreamRecord is null which is printed by Context.timestamp() in a ProcessFunction. Best regard, Dongyang Yao
|
Hi,
in SQL event time is not part of the StreamRecord but a column in the table. Thus, you need to extract it and specify the column name/location when converting to Table API: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion When converting back to DataStream API, a single time column will be converted back to StreamRecord if the SQL query preserves event-time. For example, this is not the case for `toRetractStream`. Usually toAppendStream and event-time play nicely together. Regards, Timo On 18.01.21 13:48, DongyangYao wrote: > Hey guys, > As far as I know, the timestamp field of StreamRecord instance is the > event time assgined by assignTimestampsAndWatermarks method if I have > set the time characteristic of job to event time. My confusion is that > the timestamp does not transfer through different operators as I expect. > E.g., Map operator implemented by StreamMap class: > @Override public void processElement(StreamRecord<IN> element) throws > Exception { > output.collect(element.replace(userFunction.map(element.getValue()))); } > Flat Map operator by StreamFlatMap: > @Override public void processElement(StreamRecord<IN> element) throws > Exception { collector.setTimestamp(element); > userFunction.flatMap(element.getValue(), collector); } > Probably, Agg operator by StreamGroupedReduce: > @Override > public void processElement(StreamRecord<IN> element) throws Exception { > IN value = element.getValue(); > IN currentValue = values.value(); > > if (currentValue != null) { > IN reduced = userFunction.reduce(currentValue, value); > values.update(reduced); > output.collect(element.replace(reduced)); > } else { > values.update(value); > output.collect(element.replace(value)); > } > } > Also, window operator by WindowOperator: > private void emitWindowContents(W window, ACC contents) throws Exception > { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); > processContext.window = window; userFunction.process(triggerContext.key, > window, processContext, contents, timestampedCollector); } > All the operators above will deliver the timestamp to new StreamRecord > instance. Then I just write a very simple SQL query, i.e., select a + > 10, b, c from tb, however, when I get the result stream by > toAppendStream or toRetractStream method, I find the timestamp of > StreamRecord is null which is printed by Context.timestamp() in a > ProcessFunction. > > Best regard, > Dongyang Yao > > |
Forget what I said before, I just tested the behavior and it seems there
is a bug in the conversion logic. I opened https://issues.apache.org/jira/browse/FLINK-21013 Thanks for reaching out to us. Regards, Timo On 18.01.21 15:37, Timo Walther wrote: > Hi, > > in SQL event time is not part of the StreamRecord but a column in the > table. Thus, you need to extract it and specify the column name/location > when converting to Table API: > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion > > > When converting back to DataStream API, a single time column will be > converted back to StreamRecord if the SQL query preserves event-time. > For example, this is not the case for `toRetractStream`. Usually > toAppendStream and event-time play nicely together. > > Regards, > Timo > > > On 18.01.21 13:48, DongyangYao wrote: >> Hey guys, >> As far as I know, the timestamp field of StreamRecord instance is the >> event time assgined by assignTimestampsAndWatermarks method if I have >> set the time characteristic of job to event time. My confusion is that >> the timestamp does not transfer through different operators as I expect. >> E.g., Map operator implemented by StreamMap class: >> @Override public void processElement(StreamRecord<IN> element) throws >> Exception { >> output.collect(element.replace(userFunction.map(element.getValue()))); } >> Flat Map operator by StreamFlatMap: >> @Override public void processElement(StreamRecord<IN> element) throws >> Exception { collector.setTimestamp(element); >> userFunction.flatMap(element.getValue(), collector); } >> Probably, Agg operator by StreamGroupedReduce: >> @Override >> public void processElement(StreamRecord<IN> element) throws Exception { >> IN value = element.getValue(); >> IN currentValue = values.value(); >> >> if (currentValue != null) { >> IN reduced = userFunction.reduce(currentValue, value); >> values.update(reduced); >> output.collect(element.replace(reduced)); >> } else { >> values.update(value); >> output.collect(element.replace(value)); >> } >> } >> Also, window operator by WindowOperator: >> private void emitWindowContents(W window, ACC contents) throws >> Exception { >> timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); >> processContext.window = window; >> userFunction.process(triggerContext.key, window, processContext, >> contents, timestampedCollector); } >> All the operators above will deliver the timestamp to new StreamRecord >> instance. Then I just write a very simple SQL query, i.e., select a + >> 10, b, c from tb, however, when I get the result stream by >> toAppendStream or toRetractStream method, I find the timestamp of >> StreamRecord is null which is printed by Context.timestamp() in a >> ProcessFunction. >> >> Best regard, >> Dongyang Yao >> >> > |
Free forum by Nabble | Edit this page |