Timestamp Erasure

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Timestamp Erasure

Dominik Wosiński
Hey, 
I just wanted to ask about one thing about timestamps. So, currently If I have a KeyedBroadcastProcess function followed by Temporal Table Join, it works like a charm. But, say I want to delay emitting some of the results due to any reason. So If I registerProcessingTimeTimer  and any elements are emitted in onTimer call then the timestamps are erased, meaning that I will simply get :
Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
at DataStreamSourceConversion$10.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 23 more

Is that the expected behavior? I haven't seen it described anywhere before and I wasn't able to find any docs specifying this.

Thanks in advance,
Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Erasure

Jark Wu-3
Hi  Dom,

If you are converting a DataStream to a Table with a rowtime attribute, then the  DataStream should hold event-time timestamp.
For example, call `assignTimestampsAndWatermarks` before converting to table. You can find more details in the doc [1].

Best,
Jark


On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński <[hidden email]> wrote:
Hey, 
I just wanted to ask about one thing about timestamps. So, currently If I have a KeyedBroadcastProcess function followed by Temporal Table Join, it works like a charm. But, say I want to delay emitting some of the results due to any reason. So If I registerProcessingTimeTimer  and any elements are emitted in onTimer call then the timestamps are erased, meaning that I will simply get :
Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
at DataStreamSourceConversion$10.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 23 more

Is that the expected behavior? I haven't seen it described anywhere before and I wasn't able to find any docs specifying this.

Thanks in advance,
Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Erasure

Dominik Wosiński
Yes, I understand this completely, but my question is a little bit different.

The issue is that if I have something like :
val firstStream = dataStreamFromKafka
.assignTimestampAndWatermarks(...)
val secondStream = otherStreamFromKafka
.assignTimestampsAndWatermarks(...)
.broadcast(...)

So, now If I do something like:
firstStream.keyby(...).connect(secondStream)
.process(someBroadcastProcessFunction)

Now, I only select one field from the second stream and this is not the timestamp field and from the first stream I select all fields including timestamp (in process function when creating a new record).

Then everything works like a charm and no issues there. But If I register ProcessingTime timer in this someBroadcastProcessFunction and any element is produced from onTimer function, then I get the issue described above. 

Best Regards,
Dom.  

czw., 19 mar 2020 o 02:41 Jark Wu <[hidden email]> napisał(a):
Hi  Dom,

If you are converting a DataStream to a Table with a rowtime attribute, then the  DataStream should hold event-time timestamp.
For example, call `assignTimestampsAndWatermarks` before converting to table. You can find more details in the doc [1].

Best,
Jark


On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński <[hidden email]> wrote:
Hey, 
I just wanted to ask about one thing about timestamps. So, currently If I have a KeyedBroadcastProcess function followed by Temporal Table Join, it works like a charm. But, say I want to delay emitting some of the results due to any reason. So If I registerProcessingTimeTimer  and any elements are emitted in onTimer call then the timestamps are erased, meaning that I will simply get :
Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
at DataStreamSourceConversion$10.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 23 more

Is that the expected behavior? I haven't seen it described anywhere before and I wasn't able to find any docs specifying this.

Thanks in advance,
Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Erasure

Jark Wu-3
Hi Dom,

The output elements from ProcessingTime timer in BroadcastProcessFunction or KeyedCoProcessFunction will be erased timestamp.
So you have to assign a new `assignTimestampsAndWatermarks` after that, or use EventTime timer. 

Best,
Jark

On Thu, 19 Mar 2020 at 16:40, Dominik Wosiński <[hidden email]> wrote:
Yes, I understand this completely, but my question is a little bit different.

The issue is that if I have something like :
val firstStream = dataStreamFromKafka
.assignTimestampAndWatermarks(...)
val secondStream = otherStreamFromKafka
.assignTimestampsAndWatermarks(...)
.broadcast(...)

So, now If I do something like:
firstStream.keyby(...).connect(secondStream)
.process(someBroadcastProcessFunction)

Now, I only select one field from the second stream and this is not the timestamp field and from the first stream I select all fields including timestamp (in process function when creating a new record).

Then everything works like a charm and no issues there. But If I register ProcessingTime timer in this someBroadcastProcessFunction and any element is produced from onTimer function, then I get the issue described above. 

Best Regards,
Dom.  

czw., 19 mar 2020 o 02:41 Jark Wu <[hidden email]> napisał(a):
Hi  Dom,

If you are converting a DataStream to a Table with a rowtime attribute, then the  DataStream should hold event-time timestamp.
For example, call `assignTimestampsAndWatermarks` before converting to table. You can find more details in the doc [1].

Best,
Jark


On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński <[hidden email]> wrote:
Hey, 
I just wanted to ask about one thing about timestamps. So, currently If I have a KeyedBroadcastProcess function followed by Temporal Table Join, it works like a charm. But, say I want to delay emitting some of the results due to any reason. So If I registerProcessingTimeTimer  and any elements are emitted in onTimer call then the timestamps are erased, meaning that I will simply get :
Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
at DataStreamSourceConversion$10.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 23 more

Is that the expected behavior? I haven't seen it described anywhere before and I wasn't able to find any docs specifying this.

Thanks in advance,
Best Regards,
Dom.