event time & watermarks in connected streams with broadcast state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

event time & watermarks in connected streams with broadcast state

Rinat
Hi mates, got some questions about using event time for the flink pipeline.

My pipeline consists of two connected streams, one is a stream with business rules and another one is a stream with user events. 

I’ve broadcasted stream with business rules and connected it to the stream of events, thus I can apply all existing rules to each event.
For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that accumulates broadcast state and applies rules from it to each event.
In this function I would like to register event time timers.

Ive specified a AssignerWithPeriodicWatermarks for the stream of events, that extracts event timestamp and uses it as a timestamp and watermark, but sill got no success, because the broadcasted stream doesn’t have such assigner and always returns Long.MIN as a watermark value, so flink uses the smallest watermark, received from both streams, so event time doesn’t updated.

How can I solve this problem and use timestamps from event stream as a pipeline event time ?
Here is the configuration of my pipeline.

val bSegments = env
.addSource(rules)
.broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)

val keyedEvents = env
.addSource(events)
.assignTimestampsAndWatermarks(eventTimeAssigner)
.keyBy { event => event.getId.getGid }

keyedEvents
.connect(bSegments)
.process(customerJourneyProcessor)
.addSink(sink)

I’ve found a workaround, that works for me, but I’m not sure, that it’s a proper decision.

I can add a timestamp/ watermarks assigner to the stream of rules, that will always return System.currentTime(), thereby it always will be bigger than event timestamp, so, the KeyedBroadcastProcessFunction
will use events stream timestamp as a watermark.

class RuleTimestampAssigner extends AssignerWithPeriodicWatermarks[SegmentEvent] {

override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())

override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): Long = rule.created
}

But it looks like a hack and maybe someone can give an advice with the more convenient approach.

Thx !

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply | Threaded
Open this post in threaded view
|

Re: event time & watermarks in connected streams with broadcast state

Konstantin Knauf-2
HI Rinat,

to my knowledge your workaround is fine & necessary. You can also emit a Long.MAX_VALUE instead of the processing time to save some CPU cycles.

Cheers,

Konstantin



On Wed, Feb 27, 2019 at 9:32 PM Rinat <[hidden email]> wrote:
Hi mates, got some questions about using event time for the flink pipeline.

My pipeline consists of two connected streams, one is a stream with business rules and another one is a stream with user events. 

I’ve broadcasted stream with business rules and connected it to the stream of events, thus I can apply all existing rules to each event.
For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that accumulates broadcast state and applies rules from it to each event.
In this function I would like to register event time timers.

Ive specified a AssignerWithPeriodicWatermarks for the stream of events, that extracts event timestamp and uses it as a timestamp and watermark, but sill got no success, because the broadcasted stream doesn’t have such assigner and always returns Long.MIN as a watermark value, so flink uses the smallest watermark, received from both streams, so event time doesn’t updated.

How can I solve this problem and use timestamps from event stream as a pipeline event time ?
Here is the configuration of my pipeline.

val bSegments = env
.addSource(rules)
.broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)

val keyedEvents = env
.addSource(events)
.assignTimestampsAndWatermarks(eventTimeAssigner)
.keyBy { event => event.getId.getGid }

keyedEvents
.connect(bSegments)
.process(customerJourneyProcessor)
.addSink(sink)

I’ve found a workaround, that works for me, but I’m not sure, that it’s a proper decision.

I can add a timestamp/ watermarks assigner to the stream of rules, that will always return System.currentTime(), thereby it always will be bigger than event timestamp, so, the KeyedBroadcastProcessFunction
will use events stream timestamp as a watermark.

class RuleTimestampAssigner extends AssignerWithPeriodicWatermarks[SegmentEvent] {

override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())

override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): Long = rule.created
}

But it looks like a hack and maybe someone can give an advice with the more convenient approach.

Thx !

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: event time & watermarks in connected streams with broadcast state

Rinat
Thanks Konstantin !

On 28 Feb 2019, at 02:33, Konstantin Knauf <[hidden email]> wrote:

HI Rinat,

to my knowledge your workaround is fine & necessary. You can also emit a Long.MAX_VALUE instead of the processing time to save some CPU cycles.

Cheers,

Konstantin



On Wed, Feb 27, 2019 at 9:32 PM Rinat <[hidden email]> wrote:
Hi mates, got some questions about using event time for the flink pipeline.

My pipeline consists of two connected streams, one is a stream with business rules and another one is a stream with user events. 

I’ve broadcasted stream with business rules and connected it to the stream of events, thus I can apply all existing rules to each event.
For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that accumulates broadcast state and applies rules from it to each event.
In this function I would like to register event time timers.

Ive specified a AssignerWithPeriodicWatermarks for the stream of events, that extracts event timestamp and uses it as a timestamp and watermark, but sill got no success, because the broadcasted stream doesn’t have such assigner and always returns Long.MIN as a watermark value, so flink uses the smallest watermark, received from both streams, so event time doesn’t updated.

How can I solve this problem and use timestamps from event stream as a pipeline event time ?
Here is the configuration of my pipeline.

val bSegments = env
.addSource(rules)
.broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)

val keyedEvents = env
.addSource(events)
.assignTimestampsAndWatermarks(eventTimeAssigner)
.keyBy { event => event.getId.getGid }

keyedEvents
.connect(bSegments)
.process(customerJourneyProcessor)
.addSink(sink)

I’ve found a workaround, that works for me, but I’m not sure, that it’s a proper decision.

I can add a timestamp/ watermarks assigner to the stream of rules, that will always return System.currentTime(), thereby it always will be bigger than event timestamp, so, the KeyedBroadcastProcessFunction
will use events stream timestamp as a watermark.

class RuleTimestampAssigner extends AssignerWithPeriodicWatermarks[SegmentEvent] {

override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())

override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): Long = rule.created
}

But it looks like a hack and maybe someone can give an advice with the more convenient approach.

Thx !

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever