I have a question around event timestamps after a flatMap transformation, I am using the event time time characteristic. I have two streams entering a CoFlatMap. Stream A simply updates state in that CoFlatMap and does not output any events. Stream B inserts events of type B which then output a third type C.
Will the event timestamp from B be propagated to C? Do I need to add an explicit timestamp assigner for C? All windowing in this topology is done on event C so my assumption was: Stream A does not need a timestamp assigner or watermark generator Stream B does not need a timestamp assigner or watermark generator Stream C needs a timestamp assigner and watermark generator The confusion as to whether event B's timestamp is propagated to event C arose from this sentence in the documentation: "Operators that consume multiple input streams (e.g., after a keyBy(…) or partition(…) function, or a union) track the event time on each of their input streams. The operator’s current event time is the minimum of the input streams’ event time. As the input streams update their event time, so does the operator." Thanks for your help, Chris |
You can handle this multiple ways.. If there is a natural timestamp in StreamB you can just use it very naturally by doing this: streamB .assignTimestamps(...) // your assigner .connect(streamA) .flatMap(...) // your CoFlatMapFunction .timeWindow(...) .whatever() Here the event timestamp will be propagated from streamB to the output of your CoFlatMapFunction. The collector that is passed to your CoFlatMapFunction will ensure that the elements emitted using that collector have the same timestamp as the input event. I hope that helps :) -Jamie On Tue, Jun 7, 2016 at 11:43 AM, Chris Wildman <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |