Hello,
I have some patterns in my program. For an example, A followedBy B. As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that. Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. Am I doing anything wrong? I have to cover the case where B can come after A from Kafka. Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
Hi,
Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream. Best, Aljoscha
|
Hi, Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. Still, it's not printing results. Best, Shashank On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
Ok, but will there be events in all Kafka partitions/topics?
|
Actually, In Kafka there are other topics also (around 5-6 topics) I am consuming particular topic 'x' which only contains events. Other topics have different data. I am using two consumers in my program for 2 different topics. in first topic x i am extracting the timestamp from origintimestamp variable in other one i am using system current millis. On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <[hidden email]> wrote:
Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
Can you please check what the input watermark of your operations is? There is a metric called "currentLowWatermark" for this.
Best, Aljoscha
|
Low Watermark is showing the same value which I am passing in event "1514994744412" for all the tasks related to that stream, (No watermark) is showing for Kafka source in UI. So the pattern is following for CEP A followedBy B : Event 1 - I passed A with origTimestamp X. (Low watermark updated to X) : No results (this is right ) - I passed B with origTimestamp X1. (Low watermark updated to X1) : No results (results should be printed) Event 2 - I passed A with origTimestamp Y. (Low watermark updated to Y) : Results of Event 1 printed (this is wrong ) - I passed B with origTimestamp Y1. (Low watermark updated to Y1) : No results (results should be printed) Event 3 - I passed A with origTimestamp Z. (Low watermark updated to Z) : Results of Event 2 printed (this is wrong ) - I passed B with origTimestamp Z1. (Low watermark updated to Z1) : No results (results should be printed) On Wed, Jan 3, 2018 at 8:30 PM, Aljoscha Krettek <[hidden email]> wrote:
Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
What are the actual timestamps? If your BoundedOutOfOrderness extractor is lagging by 10 seconds then only seeing Event 1.B would not trigger execution. Only the later Event 2.A is sufficiently far ahead to trigger execution, which you actually get.
|
In reply to this post by shashank734
Hi shashank,
What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 ? Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal. Best Dawid > On 3 Jan 2018, at 17:05, shashank agarwal <[hidden email]> wrote: > > ssed A with origTimestamp Y. ( signature.asc (849 bytes) Download Attachment |
@Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue. @Aljoscha, I have to cover the case where B can come after A from Kafka. How I can achieve this as Event Time is not working. How should I implement this? A followedBy B. As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. I am trying to use Event time like this. Am I am doing anything wrong? kafkaSource.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) { override def extractTimestamp(event: Event): Long = { try { val originTime = event.origTimestamp.getOrElse("0").toLong if(originTime <= 0) { val serverTime = event.serverTimestamp.getOrElse("0").toLong if(serverTime <= 0) { System.currentTimeMillis() } else { serverTime } } else { originTime } } catch { case e: Exception => Log.error("OriginTimestamp Exception occured, "error", e.printStackTrace); System.currentTimeMillis() } } } ) On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <[hidden email]> wrote: Hi shashank, Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
Think this is actually working as intended, from your earlier description of when results are produced: When you see Event 1.B, the watermark is not sufficiently advanced to trigger computation, only when you see Event 2.A does the watermark advance and you get a result. This is what I would expect to happen.
|
But this will be wrong in my case. So I have to wait for the results until I receive next event. On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek <[hidden email]> wrote:
Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
Yes, because event-time only advances if something makes it advance. Basically.
|
Free forum by Nabble | Edit this page |