Issues with Event Time and Kafka

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Issues with Event Time and Kafka

ext.mwalker
Hi Folks,

We are working on a Flink job to proccess a large amount of data coming in from a Kafka stream.

We selected Flink because the data is sometimes out of order or late, and we need to roll up the data into 30-minutes event time windows, after which we are writing it back out to an s3 bucket.

We have hit a couple issues:

1) The job works fine using processing time, but when we switch to event time (almost) nothing seems to be written out.
Our watermark code looks like this:
```
  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness);
  }
```
And we are doing this:
```
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
```
and this:
```
    .assignTimestampsAndWatermarks(new TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
```

However even though we get millions of records per hour (the vast majority of which are no more that 30 minutes late) we get like 2 - 10 records per hour written out to the s3 bucket.
We are using a custom BucketingFileSink Bucketer if folks believe that is the issue I would be happy to provide that code here as well.

2) On top of all this, we would really prefer to write the records directly to Aurora in RDS rather than to an intermediate s3 bucket, but it seems that the JDBC sink connector is unsupported / doesn't exist.
If this is not the case we would love to know.

Thanks in advance for all the help / insight on this,

Max Walker
Reply | Threaded
Open this post in threaded view
|

Re: Issues with Event Time and Kafka

Stephan Ewen
Hi!

At a first glance, your code looks correct to assign the Watermarks. What is your watermark interval in the config?

Can you check with the Flink metrics (if you are using 1.2) to see how many rows leave the source, how many enter/leave the window operators, etc?

That should help figuring out why there are so few result rows...

Stephan


On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]> wrote:
Hi Folks,

We are working on a Flink job to proccess a large amount of data coming in
from a Kafka stream.

We selected Flink because the data is sometimes out of order or late, and we
need to roll up the data into 30-minutes event time windows, after which we
are writing it back out to an s3 bucket.

We have hit a couple issues:

1) The job works fine using processing time, but when we switch to event
time (almost) nothing seems to be written out.
Our watermark code looks like this:
```
  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness);
  }
```
And we are doing this:
```
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
```
and this:
```
    .assignTimestampsAndWatermarks(new
TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
```

However even though we get millions of records per hour (the vast majority
of which are no more that 30 minutes late) we get like 2 - 10 records per
hour written out to the s3 bucket.
We are using a custom BucketingFileSink Bucketer if folks believe that is
the issue I would be happy to provide that code here as well.

2) On top of all this, we would really prefer to write the records directly
to Aurora in RDS rather than to an intermediate s3 bucket, but it seems that
the JDBC sink connector is unsupported / doesn't exist.
If this is not the case we would love to know.

Thanks in advance for all the help / insight on this,

Max Walker



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Event Time and Kafka

ext.mwalker
Hi Stephan,

The right number of events seem to leave the source and enter the windows, but it shows that 0 exit the windows. 

Also I have tried 30 minutes and not setting the watermark interval, I am not sure what I am supposed to put there the docs seem vague about that.

Best,

Max

On Tue, Mar 7, 2017 at 1:54 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi!

At a first glance, your code looks correct to assign the Watermarks. What is your watermark interval in the config?

Can you check with the Flink metrics (if you are using 1.2) to see how many rows leave the source, how many enter/leave the window operators, etc?

That should help figuring out why there are so few result rows...

Stephan


On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]> wrote:
Hi Folks,

We are working on a Flink job to proccess a large amount of data coming in
from a Kafka stream.

We selected Flink because the data is sometimes out of order or late, and we
need to roll up the data into 30-minutes event time windows, after which we
are writing it back out to an s3 bucket.

We have hit a couple issues:

1) The job works fine using processing time, but when we switch to event
time (almost) nothing seems to be written out.
Our watermark code looks like this:
```
  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness);
  }
```
And we are doing this:
```
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
```
and this:
```
    .assignTimestampsAndWatermarks(new
TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
```

However even though we get millions of records per hour (the vast majority
of which are no more that 30 minutes late) we get like 2 - 10 records per
hour written out to the s3 bucket.
We are using a custom BucketingFileSink Bucketer if folks believe that is
the issue I would be happy to provide that code here as well.

2) On top of all this, we would really prefer to write the records directly
to Aurora in RDS rather than to an intermediate s3 bucket, but it seems that
the JDBC sink connector is unsupported / doesn't exist.
If this is not the case we would love to know.

Thanks in advance for all the help / insight on this,

Max Walker



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12084.html
To unsubscribe from Issues with Event Time and Kafka, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Event Time and Kafka

Dawid Wysakowicz
Hi Max,
How do you assign timestamps to your events (in event-time case)? Could you post whole code for your TimestampAndWatermarkAssigner? 

Regards,
Dawid

2017-03-07 20:59 GMT+01:00 ext.mwalker <[hidden email]>:
Hi Stephan,

The right number of events seem to leave the source and enter the windows, but it shows that 0 exit the windows. 

Also I have tried 30 minutes and not setting the watermark interval, I am not sure what I am supposed to put there the docs seem vague about that.

Best,

Max

On Tue, Mar 7, 2017 at 1:54 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi!

At a first glance, your code looks correct to assign the Watermarks. What is your watermark interval in the config?

Can you check with the Flink metrics (if you are using 1.2) to see how many rows leave the source, how many enter/leave the window operators, etc?

That should help figuring out why there are so few result rows...

Stephan


On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]> wrote:
Hi Folks,

We are working on a Flink job to proccess a large amount of data coming in
from a Kafka stream.

We selected Flink because the data is sometimes out of order or late, and we
need to roll up the data into 30-minutes event time windows, after which we
are writing it back out to an s3 bucket.

We have hit a couple issues:

1) The job works fine using processing time, but when we switch to event
time (almost) nothing seems to be written out.
Our watermark code looks like this:
```
  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness);
  }
```
And we are doing this:
```
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
```
and this:
```
    .assignTimestampsAndWatermarks(new
TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
```

However even though we get millions of records per hour (the vast majority
of which are no more that 30 minutes late) we get like 2 - 10 records per
hour written out to the s3 bucket.
We are using a custom BucketingFileSink Bucketer if folks believe that is
the issue I would be happy to provide that code here as well.

2) On top of all this, we would really prefer to write the records directly
to Aurora in RDS rather than to an intermediate s3 bucket, but it seems that
the JDBC sink connector is unsupported / doesn't exist.
If this is not the case we would love to know.

Thanks in advance for all the help / insight on this,

Max Walker



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12084.html
To unsubscribe from Issues with Event Time and Kafka, click here.
NAML



View this message in context: Re: Issues with Event Time and Kafka

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Event Time and Kafka

ext.eformichella
Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is:
```
class TimestampAndWatermarkAssigner(val maxLateness: Long) extends AssignerWithPeriodicWatermarks[Row] {

  override def extractTimestamp(element: Row, previousElementTimestamp: Long): Long = {
    element.minTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness)
  }
}
```

Where Row is a class representing the incoming JSON object coming from Kafka, which includes the timestamp

Thanks,
-Ethan
Reply | Threaded
Open this post in threaded view
|

Re: Issues with Event Time and Kafka

Dawid Wysakowicz
Hi Ethan,

I believe then it is because the Watermark and Timestamps in your implementation are uncorrelated. What Watermark really is a marker that says there will be no elements with timestamp smaller than the value of this watermark. For more info on the concept see [1].

In your case as you say that events can "lag" for 30 minutes, you should try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for a case like yours.

Regards,
Dawid

2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email]>:
Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is:
```
class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
AssignerWithPeriodicWatermarks[Row] {

  override def extractTimestamp(element: Row, previousElementTimestamp:
Long): Long = {
    element.minTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness)
  }
}
```

Where Row is a class representing the incoming JSON object coming from
Kafka, which includes the timestamp

Thanks,
-Ethan



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12090.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Event Time and Kafka

ext.eformichella
Thanks for the suggestion, we can definitely try that out.

My one concern there is that events technically can lag for days or even months in some cases, but we only care about including the events that lag for 30 minutes or so, and would like the further lagging events to be ignored - I just want to make sure that doesn't require special handling.

I also just want to make sure I'm understanding the maximum lateness watermark correctly. Suppose a watermark gets generated, and then an element with an older timestamp is found. My understanding was that that element should be ignored, but from our results it looks like the late element actually overwrites the aggregate of the on-time elements. Is this expected behavior?

Thank you for your help!
-Ethan

On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi Ethan,

I believe then it is because the Watermark and Timestamps in your implementation are uncorrelated. What Watermark really is a marker that says there will be no elements with timestamp smaller than the value of this watermark. For more info on the concept see [1].

In your case as you say that events can "lag" for 30 minutes, you should try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for a case like yours.

Regards,
Dawid

2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email]>:
Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is:
```
class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
AssignerWithPeriodicWatermarks[Row] {

  override def extractTimestamp(element: Row, previousElementTimestamp:
Long): Long = {
    element.minTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness)
  }
}
```

Where Row is a class representing the incoming JSON object coming from
Kafka, which includes the timestamp

Thanks,
-Ethan



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12090.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12092.html
To unsubscribe from Issues with Event Time and Kafka, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Event Time and Kafka

rmetzger0
Hi Ethan,

how late elements (elements with event time after the watermark) are handled depends on the operator. Flink's window operators will trigger a single event window when they fall into the "allowed lateness" timeframe. Otherwise, they are dropped.

On Thu, Mar 9, 2017 at 5:30 PM, ext.eformichella <[hidden email]> wrote:
Thanks for the suggestion, we can definitely try that out.

My one concern there is that events technically can lag for days or even months in some cases, but we only care about including the events that lag for 30 minutes or so, and would like the further lagging events to be ignored - I just want to make sure that doesn't require special handling.

I also just want to make sure I'm understanding the maximum lateness watermark correctly. Suppose a watermark gets generated, and then an element with an older timestamp is found. My understanding was that that element should be ignored, but from our results it looks like the late element actually overwrites the aggregate of the on-time elements. Is this expected behavior?

Thank you for your help!
-Ethan

On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi Ethan,

I believe then it is because the Watermark and Timestamps in your implementation are uncorrelated. What Watermark really is a marker that says there will be no elements with timestamp smaller than the value of this watermark. For more info on the concept see [1].

In your case as you say that events can "lag" for 30 minutes, you should try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for a case like yours.

Regards,
Dawid

2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email]>:
Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is:
```
class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
AssignerWithPeriodicWatermarks[Row] {

  override def extractTimestamp(element: Row, previousElementTimestamp:
Long): Long = {
    element.minTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness)
  }
}
```

Where Row is a class representing the incoming JSON object coming from
Kafka, which includes the timestamp

Thanks,
-Ethan



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12090.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12092.html
To unsubscribe from Issues with Event Time and Kafka, click here.
NAML



View this message in context: Re: Issues with Event Time and Kafka

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Event Time and Kafka

Aljoscha Krettek
What Robert said is correct. However, that behaviour depends on the Trigger. You can write your own Trigger that behaves differently when late data arrives, that is, you could write a trigger that never fires for late data. In that case, you can also simply set the allowed lateness to zero, however. You could also write a trigger that waits for a certain number of late elements to arrive and then triggers a firing.

Best,
Aljoscha


On Fri, Mar 10, 2017, at 20:14, Robert Metzger wrote:
Hi Ethan,

how late elements (elements with event time after the watermark) are handled depends on the operator. Flink's window operators will trigger a single event window when they fall into the "allowed lateness" timeframe. Otherwise, they are dropped.

On Thu, Mar 9, 2017 at 5:30 PM, ext.eformichella <[hidden email]> wrote:
Thanks for the suggestion, we can definitely try that out.

My one concern there is that events technically can lag for days or even months in some cases, but we only care about including the events that lag for 30 minutes or so, and would like the further lagging events to be ignored - I just want to make sure that doesn't require special handling.

I also just want to make sure I'm understanding the maximum lateness watermark correctly. Suppose a watermark gets generated, and then an element with an older timestamp is found. My understanding was that that element should be ignored, but from our results it looks like the late element actually overwrites the aggregate of the on-time elements. Is this expected behavior?

Thank you for your help!
-Ethan

On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:

Hi Ethan,

I believe then it is because the Watermark and Timestamps in your implementation are uncorrelated. What Watermark really is a marker that says there will be no elements with timestamp smaller than the value of this watermark. For more info on the concept see [1].

In your case as you say that events can "lag" for 30 minutes, you should try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for a case like yours.

Regards,
Dawid


2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email]>:
Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is:
```
class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
AssignerWithPeriodicWatermarks[Row] {

  override def extractTimestamp(element: Row, previousElementTimestamp:
Long): Long = {
    element.minTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness)
  }
}
```

Where Row is a class representing the incoming JSON object coming from
Kafka, which includes the timestamp

Thanks,
-Ethan



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12090.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.






If you reply to this email, your message will be added to the discussion below:

To unsubscribe from Issues with Event Time and Kafka, click here.
NAML



View this message in context:Re: Issues with Event Time and Kafka