Hi Folks,
We are working on a Flink job to proccess a large amount of data coming in from a Kafka stream. We selected Flink because the data is sometimes out of order or late, and we need to roll up the data into 30-minutes event time windows, after which we are writing it back out to an s3 bucket. We have hit a couple issues: 1) The job works fine using processing time, but when we switch to event time (almost) nothing seems to be written out. Our watermark code looks like this: ``` override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis() - maxLateness); } ``` And we are doing this: ``` val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) ``` and this: ``` .assignTimestampsAndWatermarks(new TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds)) ``` However even though we get millions of records per hour (the vast majority of which are no more that 30 minutes late) we get like 2 - 10 records per hour written out to the s3 bucket. We are using a custom BucketingFileSink Bucketer if folks believe that is the issue I would be happy to provide that code here as well. 2) On top of all this, we would really prefer to write the records directly to Aurora in RDS rather than to an intermediate s3 bucket, but it seems that the JDBC sink connector is unsupported / doesn't exist. If this is not the case we would love to know. Thanks in advance for all the help / insight on this, Max Walker |
Hi!
At a first glance, your code looks correct to assign the Watermarks. What is your watermark interval in the config? Can you check with the Flink metrics (if you are using 1.2) to see how many rows leave the source, how many enter/leave the window operators, etc? That should help figuring out why there are so few result rows... Stephan On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]> wrote: Hi Folks, |
Hi Stephan, The right number of events seem to leave the source and enter the windows, but it shows that 0 exit the windows. Also I have tried 30 minutes and not setting the watermark interval, I am not sure what I am supposed to put there the docs seem vague about that. Best, Max On Tue, Mar 7, 2017 at 1:54 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
Hi Max, How do you assign timestamps to your events (in event-time case)? Could you post whole code for your TimestampAndWatermarkAssigner? Regards, Dawid 2017-03-07 20:59 GMT+01:00 ext.mwalker <[hidden email]>:
|
Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is: ``` class TimestampAndWatermarkAssigner(val maxLateness: Long) extends AssignerWithPeriodicWatermarks[Row] { override def extractTimestamp(element: Row, previousElementTimestamp: Long): Long = { element.minTime } override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis() - maxLateness) } } ``` Where Row is a class representing the incoming JSON object coming from Kafka, which includes the timestamp Thanks, -Ethan |
Hi Ethan, In your case as you say that events can "lag" for 30 minutes, you should try the BoundedOutOfOrdernessTimestamp Regards, Dawid 2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email]>: Hi Dawid, I'm working with Max on the project |
Thanks for the suggestion, we can definitely try that out. My one concern there is that events technically can lag for days or even months in some cases, but we only care about including the events that lag for 30 minutes or so, and would like the further lagging events to be ignored - I just want to make sure that doesn't require special handling. I also just want to make sure I'm understanding the maximum lateness watermark correctly. Suppose a watermark gets generated, and then an element with an older timestamp is found. My understanding was that that element should be ignored, but from our results it looks like the late element actually overwrites the aggregate of the on-time elements. Is this expected behavior? Thank you for your help! -Ethan On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
Hi Ethan, how late elements (elements with event time after the watermark) are handled depends on the operator. Flink's window operators will trigger a single event window when they fall into the "allowed lateness" timeframe. Otherwise, they are dropped. On Thu, Mar 9, 2017 at 5:30 PM, ext.eformichella <[hidden email]> wrote:
|
What Robert said is correct. However, that behaviour depends on the Trigger. You can write your own Trigger that behaves differently when late data arrives, that is, you could write a trigger that never fires for late data. In that case, you can also simply set the allowed lateness to zero, however. You could also write a trigger that waits for a certain number of late elements to arrive and then triggers a firing.
Best,
Aljoscha
On Fri, Mar 10, 2017, at 20:14, Robert Metzger wrote:
|
Free forum by Nabble | Edit this page |