Event timestamps after data transformations

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Event timestamps after data transformations

Chris Wildman
I have a question around event timestamps after a flatMap transformation, I am using the event time time characteristic. I have two streams entering a CoFlatMap. Stream A simply updates state in that CoFlatMap and does not output any events. Stream B inserts events of type B which then output a third type C.

Will the event timestamp from B be propagated to C? Do I need to add an explicit timestamp assigner for C?

All windowing in this topology is done on event C so my assumption was:
Stream A does not need a timestamp assigner or watermark generator
Stream B does not need a timestamp assigner or watermark generator
Stream C needs a timestamp assigner and watermark generator

The confusion as to whether event B's timestamp is propagated to event C arose from this sentence in the documentation: "Operators that consume multiple input streams (e.g., after a keyBy(…) or partition(…) function, or a union) track the event time on each of their input streams. The operator’s current event time is the minimum of the input streams’ event time. As the input streams update their event time, so does the operator."

Thanks for your help,
Chris



Reply | Threaded
Open this post in threaded view
|

Re: Event timestamps after data transformations

Jamie Grier
You can handle this multiple ways..  If there is a natural timestamp in StreamB you can just use it very naturally by doing this:

streamB
    .assignTimestamps(...) // your assigner
   .connect(streamA)
   .flatMap(...) // your CoFlatMapFunction
   .timeWindow(...)
   .whatever()

Here the event timestamp will be propagated from streamB to the output of your CoFlatMapFunction.  The collector that is passed to your CoFlatMapFunction will ensure that the elements emitted using that collector have the same timestamp as the input event.

I hope that helps :)

-Jamie



On Tue, Jun 7, 2016 at 11:43 AM, Chris Wildman <[hidden email]> wrote:
I have a question around event timestamps after a flatMap transformation, I am using the event time time characteristic. I have two streams entering a CoFlatMap. Stream A simply updates state in that CoFlatMap and does not output any events. Stream B inserts events of type B which then output a third type C.

Will the event timestamp from B be propagated to C? Do I need to add an explicit timestamp assigner for C?

All windowing in this topology is done on event C so my assumption was:
Stream A does not need a timestamp assigner or watermark generator
Stream B does not need a timestamp assigner or watermark generator
Stream C needs a timestamp assigner and watermark generator

The confusion as to whether event B's timestamp is propagated to event C arose from this sentence in the documentation: "Operators that consume multiple input streams (e.g., after a keyBy(…) or partition(…) function, or a union) track the event time on each of their input streams. The operator’s current event time is the minimum of the input streams’ event time. As the input streams update their event time, so does the operator."

Thanks for your help,
Chris






--

Jamie Grier
data Artisans, Director of Applications Engineering