Flink CEP with event time

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP with event time

shashank734
Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

Aljoscha Krettek
Hi,

Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Best,
Aljoscha

On 3. Jan 2018, at 14:29, shashank agarwal <[hidden email]> wrote:

Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....





Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

shashank734
Hi,

Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 

Still, it's not printing results.

Best,
Shashank






On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Best,
Aljoscha


On 3. Jan 2018, at 14:29, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-5578003648826434265mailtrack-img" alt="" style="display:flex" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7">Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....








--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

Aljoscha Krettek
Ok, but will there be events in all Kafka partitions/topics?

On 3. Jan 2018, at 15:33, shashank agarwal <[hidden email]> wrote:

Hi,

Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 

Still, it's not printing results.

Best,
Shashank






On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Best,
Aljoscha


On 3. Jan 2018, at 14:29, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-5578003648826434265mailtrack-img" alt="" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" style="display: flex;">Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....








-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

shashank734
Actually, In Kafka there are other topics also (around 5-6 topics) I am consuming particular topic 'x' which only contains events.  Other topics have different data. 

I am using two consumers in my program for 2 different topics. in first topic x i am extracting the timestamp from origintimestamp variable in other one i am using system current millis.





On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <[hidden email]> wrote:
Ok, but will there be events in all Kafka partitions/topics?


On 3. Jan 2018, at 15:33, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-4218841534358936275mailtrack-img" alt="" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" style="display:flex">Hi,

Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 

Still, it's not printing results.

Best,
Shashank






On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Best,
Aljoscha


On 3. Jan 2018, at 14:29, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-4218841534358936275m_-5578003648826434265mailtrack-img" alt="" style="display:flex" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7">Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....








-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

Aljoscha Krettek
Can you please check what the input watermark of your operations is? There is a metric called "currentLowWatermark" for this.

Best,
Aljoscha

On 3. Jan 2018, at 15:54, shashank agarwal <[hidden email]> wrote:

Actually, In Kafka there are other topics also (around 5-6 topics) I am consuming particular topic 'x' which only contains events.  Other topics have different data. 

I am using two consumers in my program for 2 different topics. in first topic x i am extracting the timestamp from origintimestamp variable in other one i am using system current millis.





On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <[hidden email]> wrote:
Ok, but will there be events in all Kafka partitions/topics?


On 3. Jan 2018, at 15:33, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-4218841534358936275mailtrack-img" alt="" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" style="display:flex">Hi,

Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 

Still, it's not printing results.

Best,
Shashank






On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Best,
Aljoscha


On 3. Jan 2018, at 14:29, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-4218841534358936275m_-5578003648826434265mailtrack-img" alt="" style="display:flex" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7">Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....








-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....


Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

shashank734
Low Watermark is showing the same value which I am passing in event "1514994744412" for all the tasks related to that stream, (No watermark) is showing for Kafka source in UI.

So the pattern is following for CEP A followedBy B :

Event 1
- I passed A with origTimestamp X. (Low watermark updated to X)  : No results  (this is right )
- I passed B with origTimestamp X1. (Low watermark updated to X1)  : No results (results should be printed)

Event 2
- I passed A with origTimestamp Y. (Low watermark updated to Y)  : Results of Event 1 printed  (this is wrong )
- I passed B with origTimestamp Y1. (Low watermark updated to Y1)  : No results (results should be printed)

Event 3
- I passed A with origTimestamp Z. (Low watermark updated to Z)  : Results of Event 2 printed  (this is wrong )
- I passed B with origTimestamp Z1. (Low watermark updated to Z1)  : No results (results should be printed)




On Wed, Jan 3, 2018 at 8:30 PM, Aljoscha Krettek <[hidden email]> wrote:
Can you please check what the input watermark of your operations is? There is a metric called "currentLowWatermark" for this.

Best,
Aljoscha

On 3. Jan 2018, at 15:54, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-7324887465847497689mailtrack-img" alt="" style="display:flex" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7">Actually, In Kafka there are other topics also (around 5-6 topics) I am consuming particular topic 'x' which only contains events.  Other topics have different data. 

I am using two consumers in my program for 2 different topics. in first topic x i am extracting the timestamp from origintimestamp variable in other one i am using system current millis.





On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <[hidden email]> wrote:
Ok, but will there be events in all Kafka partitions/topics?


On 3. Jan 2018, at 15:33, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-7324887465847497689m_-4218841534358936275mailtrack-img" alt="" style="display:flex" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7">Hi,

Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 

Still, it's not printing results.

Best,
Shashank






On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Best,
Aljoscha


On 3. Jan 2018, at 14:29, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-7324887465847497689m_-4218841534358936275m_-5578003648826434265mailtrack-img" alt="" style="display:flex" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7">Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....








-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....





--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

Aljoscha Krettek
What are the actual timestamps? If your BoundedOutOfOrderness extractor is lagging by 10 seconds then only seeing Event 1.B would not trigger execution. Only the later Event 2.A is sufficiently far ahead to trigger execution, which you actually get.

On 3. Jan 2018, at 17:05, shashank agarwal <[hidden email]> wrote:

Low Watermark is showing the same value which I am passing in event "1514994744412" for all the tasks related to that stream, (No watermark) is showing for Kafka source in UI.

So the pattern is following for CEP A followedBy B :

Event 1
- I passed A with origTimestamp X. (Low watermark updated to X)  : No results  (this is right )
- I passed B with origTimestamp X1. (Low watermark updated to X1)  : No results (results should be printed)

Event 2
- I passed A with origTimestamp Y. (Low watermark updated to Y)  : Results of Event 1 printed  (this is wrong )
- I passed B with origTimestamp Y1. (Low watermark updated to Y1)  : No results (results should be printed)

Event 3
- I passed A with origTimestamp Z. (Low watermark updated to Z)  : Results of Event 2 printed  (this is wrong )
- I passed B with origTimestamp Z1. (Low watermark updated to Z1)  : No results (results should be printed)




On Wed, Jan 3, 2018 at 8:30 PM, Aljoscha Krettek <[hidden email]> wrote:
Can you please check what the input watermark of your operations is? There is a metric called "currentLowWatermark" for this.

Best,
Aljoscha

On 3. Jan 2018, at 15:54, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-7324887465847497689mailtrack-img" alt="" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" style="display: flex;">Actually, In Kafka there are other topics also (around 5-6 topics) I am consuming particular topic 'x' which only contains events.  Other topics have different data. 

I am using two consumers in my program for 2 different topics. in first topic x i am extracting the timestamp from origintimestamp variable in other one i am using system current millis.





On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <[hidden email]> wrote:
Ok, but will there be events in all Kafka partitions/topics?


On 3. Jan 2018, at 15:33, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-7324887465847497689m_-4218841534358936275mailtrack-img" alt="" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" style="display: flex;">Hi,

Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 

Still, it's not printing results.

Best,
Shashank






On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Best,
Aljoscha


On 3. Jan 2018, at 14:29, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-7324887465847497689m_-4218841534358936275m_-5578003648826434265mailtrack-img" alt="" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" style="display: flex;">Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....








-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....





-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

Dawid Wysakowicz
In reply to this post by shashank734
Hi shashank,

What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?

Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal.

Best
Dawid

> On 3 Jan 2018, at 17:05, shashank agarwal <[hidden email]> wrote:
>
> ssed A with origTimestamp Y. (


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

shashank734
@Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue.


@Aljoscha, I have to cover the case where B can come after A from Kafka. How I can achieve this as Event Time is not working. How should I implement this?

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I am trying to use Event time like this. Am I am doing anything wrong?


 kafkaSource.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
          override def extractTimestamp(event: Event): Long = {
            try {
              val originTime = event.origTimestamp.getOrElse("0").toLong
              if(originTime <= 0)
                {
                  val serverTime = event.serverTimestamp.getOrElse("0").toLong
                  if(serverTime <= 0)
                    {
                      System.currentTimeMillis()
                    }
                  else
                    {
                      serverTime
                    }
                }
              else {
                originTime
              }
            }
            catch {
              case e: Exception => Log.error("OriginTimestamp Exception occured, "error", e.printStackTrace);
                System.currentTimeMillis()
            }
          }
        }
      )

On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <[hidden email]> wrote:
Hi shashank,

What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?

Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal.

Best
Dawid

> On 3 Jan 2018, at 17:05, shashank agarwal <[hidden email]> wrote:
>
> ssed A with origTimestamp Y. (




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

Aljoscha Krettek
Think this is actually working as intended, from your earlier description of when results are produced: When you see Event 1.B, the watermark is not sufficiently advanced to trigger computation, only when you see Event 2.A does the watermark advance and you get a result. This is what I would expect to happen.


On 3. Jan 2018, at 19:46, shashank agarwal <[hidden email]> wrote:

@Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue.


@Aljoscha, I have to cover the case where B can come after A from Kafka. How I can achieve this as Event Time is not working. How should I implement this?

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I am trying to use Event time like this. Am I am doing anything wrong?


 kafkaSource.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
          override def extractTimestamp(event: Event): Long = {
            try {
              val originTime = event.origTimestamp.getOrElse("0").toLong
              if(originTime <= 0)
                {
                  val serverTime = event.serverTimestamp.getOrElse("0").toLong
                  if(serverTime <= 0)
                    {
                      System.currentTimeMillis()
                    }
                  else
                    {
                      serverTime
                    }
                }
              else {
                originTime
              }
            }
            catch {
              case e: Exception => Log.error("OriginTimestamp Exception occured, "error", e.printStackTrace);
                System.currentTimeMillis()
            }
          }
        }
      )

On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <[hidden email]> wrote:
Hi shashank,

What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?

Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal.

Best
Dawid

> On 3 Jan 2018, at 17:05, shashank agarwal <[hidden email]> wrote:
>
> ssed A with origTimestamp Y. (




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....


Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

shashank734
But this will be wrong in my case. So I have to wait for the results until I receive next event.




On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek <[hidden email]> wrote:
Think this is actually working as intended, from your earlier description of when results are produced: When you see Event 1.B, the watermark is not sufficiently advanced to trigger computation, only when you see Event 2.A does the watermark advance and you get a result. This is what I would expect to happen.


On 3. Jan 2018, at 19:46, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-410465090905817370mailtrack-img" alt="" style="display:flex" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7">@Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue.


@Aljoscha, I have to cover the case where B can come after A from Kafka. How I can achieve this as Event Time is not working. How should I implement this?

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I am trying to use Event time like this. Am I am doing anything wrong?


 kafkaSource.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
          override def extractTimestamp(event: Event): Long = {
            try {
              val originTime = event.origTimestamp.getOrElse("0").toLong
              if(originTime <= 0)
                {
                  val serverTime = event.serverTimestamp.getOrElse("0").toLong
                  if(serverTime <= 0)
                    {
                      System.currentTimeMillis()
                    }
                  else
                    {
                      serverTime
                    }
                }
              else {
                originTime
              }
            }
            catch {
              case e: Exception => Log.error("OriginTimestamp Exception occured, "error", e.printStackTrace);
                System.currentTimeMillis()
            }
          }
        }
      )

On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <[hidden email]> wrote:
Hi shashank,

What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?

Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal.

Best
Dawid

> On 3 Jan 2018, at 17:05, shashank agarwal <[hidden email]> wrote:
>
> ssed A with origTimestamp Y. (




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....





--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP with event time

Aljoscha Krettek
Yes, because event-time only advances if something makes it advance. Basically.

On 4. Jan 2018, at 11:34, shashank agarwal <[hidden email]> wrote:

But this will be wrong in my case. So I have to wait for the results until I receive next event.




On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek <[hidden email]> wrote:
Think this is actually working as intended, from your earlier description of when results are produced: When you see Event 1.B, the watermark is not sufficiently advanced to trigger computation, only when you see Event 2.A does the watermark advance and you get a result. This is what I would expect to happen.


On 3. Jan 2018, at 19:46, shashank agarwal <[hidden email]> wrote:

<img width="0" height="0" class="m_-410465090905817370mailtrack-img" alt="" src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" style="display: flex;">@Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue.


@Aljoscha, I have to cover the case where B can come after A from Kafka. How I can achieve this as Event Time is not working. How should I implement this?

 A followedBy B.

As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 

I am trying to use Event time like this. Am I am doing anything wrong?


 kafkaSource.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
          override def extractTimestamp(event: Event): Long = {
            try {
              val originTime = event.origTimestamp.getOrElse("0").toLong
              if(originTime <= 0)
                {
                  val serverTime = event.serverTimestamp.getOrElse("0").toLong
                  if(serverTime <= 0)
                    {
                      System.currentTimeMillis()
                    }
                  else
                    {
                      serverTime
                    }
                }
              else {
                originTime
              }
            }
            catch {
              case e: Exception => Log.error("OriginTimestamp Exception occured, "error", e.printStackTrace);
                System.currentTimeMillis()
            }
          }
        }
      )

On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <[hidden email]> wrote:
Hi shashank,

What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?

Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal.

Best
Dawid

> On 3 Jan 2018, at 17:05, shashank agarwal <[hidden email]> wrote:
>
> ssed A with origTimestamp Y. (




-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....





-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....