Batch stream Sink delay ?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Batch stream Sink delay ?

Paul Smith

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Fabian Hueske-2
Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.
Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.
If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.
When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Paul Smith

Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted at all until the end.  This stays all the way through the job.

 

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is being called, and returning the value I’d expect (The timestamp from the log line), and looks legitimate.  My WindowFunction which is doing the aggregation is not being called until right at the end of the job, but yet the windows comes out ok in the destination.

 

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to debug why the Windowing is not being triggered?  I can’t glean any useful further ideas from the metrics I can see..

 

Appreciate the help

 

Cheers,


Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.

Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.

If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.

When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

 

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Fabian Hueske-2
Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must ensure that each parallel task is producing data.
If a source does not produce data, it will not increase the timestamps of its watermarks.

Another challenge, that you might run into is that you need to make sure that the file (or file chunks if it is split for parallel reading) is read in the increasing timestamp order.
Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with file input to get a feeling for the API.
I would try to set the parallelism of the file source to 1 to ensure that the data is read in the same order and that all tasks are producing data.

Hope this helps,
Fabian

2017-03-15 23:54 GMT+01:00 Paul Smith <[hidden email]>:

Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted at all until the end.  This stays all the way through the job.

 

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is being called, and returning the value I’d expect (The timestamp from the log line), and looks legitimate.  My WindowFunction which is doing the aggregation is not being called until right at the end of the job, but yet the windows comes out ok in the destination.

 

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to debug why the Windowing is not being triggered?  I can’t glean any useful further ideas from the metrics I can see..

 

Appreciate the help

 

Cheers,


Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.

Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.

If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.

When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

 

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Paul Smith
Thanks again for your reply. 

I've tried with both Parallel=1 through to 3. Same behavior. 

The log file is monotonically increasing time stamps generated through an application using log4j. Each log line is distinctly incrementing time stamps it is an 8GB file I'm using as a test case and has about 8 million rows. 

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink at the end and all looks correct - I set the folded results record time stamp to the end of each window and I see nice chunks of 15 minute blocks in the result. 

I'm not sure why the watermarks are not being sent as the data progresses. 

I might try pushing the data (same data) though Kafka and see if I get a different result. I might also take a sample through the file (rather than the whole file to see if I get differing results)

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  Can I easily remote attach to a running process via standard java options?

Regards

Paul 

On 16 Mar 2017, at 21:15, Fabian Hueske <[hidden email]> wrote:

Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must ensure that each parallel task is producing data.
If a source does not produce data, it will not increase the timestamps of its watermarks.

Another challenge, that you might run into is that you need to make sure that the file (or file chunks if it is split for parallel reading) is read in the increasing timestamp order.
Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with file input to get a feeling for the API.
I would try to set the parallelism of the file source to 1 to ensure that the data is read in the same order and that all tasks are producing data.

Hope this helps,
Fabian

2017-03-15 23:54 GMT+01:00 Paul Smith <[hidden email]>:

Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted at all until the end.  This stays all the way through the job.

 

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is being called, and returning the value I’d expect (The timestamp from the log line), and looks legitimate.  My WindowFunction which is doing the aggregation is not being called until right at the end of the job, but yet the windows comes out ok in the destination.

 

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to debug why the Windowing is not being triggered?  I can’t glean any useful further ideas from the metrics I can see..

 

Appreciate the help

 

Cheers,


Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.

Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.

If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.

When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

 

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Fabian Hueske-2
What kind of timestamp and watermark extractor are you using?
Can you share your implementation?

You can have a look at the example programs (for example [1]). These can be started and debugged inside an IDE by executing the main method.
If you run it in an external process, you should be able to connect to the process with the standard options.

Best, Fabian

2017-03-16 12:10 GMT+01:00 Paul Smith <[hidden email]>:
Thanks again for your reply. 

I've tried with both Parallel=1 through to 3. Same behavior. 

The log file is monotonically increasing time stamps generated through an application using log4j. Each log line is distinctly incrementing time stamps it is an 8GB file I'm using as a test case and has about 8 million rows. 

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink at the end and all looks correct - I set the folded results record time stamp to the end of each window and I see nice chunks of 15 minute blocks in the result. 

I'm not sure why the watermarks are not being sent as the data progresses. 

I might try pushing the data (same data) though Kafka and see if I get a different result. I might also take a sample through the file (rather than the whole file to see if I get differing results)

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  Can I easily remote attach to a running process via standard java options?

Regards

Paul 

On 16 Mar 2017, at 21:15, Fabian Hueske <[hidden email]> wrote:

Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must ensure that each parallel task is producing data.
If a source does not produce data, it will not increase the timestamps of its watermarks.

Another challenge, that you might run into is that you need to make sure that the file (or file chunks if it is split for parallel reading) is read in the increasing timestamp order.
Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with file input to get a feeling for the API.
I would try to set the parallelism of the file source to 1 to ensure that the data is read in the same order and that all tasks are producing data.

Hope this helps,
Fabian

2017-03-15 23:54 GMT+01:00 Paul Smith <[hidden email]>:

Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted at all until the end.  This stays all the way through the job.

 

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is being called, and returning the value I’d expect (The timestamp from the log line), and looks legitimate.  My WindowFunction which is doing the aggregation is not being called until right at the end of the job, but yet the windows comes out ok in the destination.

 

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to debug why the Windowing is not being triggered?  I can’t glean any useful further ideas from the metrics I can see..

 

Appreciate the help

 

Cheers,


Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.

Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.

If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.

When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

 

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 

 



Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Paul Smith

I have managed to discover that my presumption of log4j log file being a _guaranteed_ sequential order of time is incorrect (race conditions).  So some logs are out of sequence, and I was getting _some_ Monotonic Timestamp violations.  I did not discover this because somehow my local flink was not outputting logs properly. (a restart of the service seemed to fix that..)

 

*HOWEVER*.. Once I switched to an “allow lateness” style timestamp assigner, I still get the same problem.


Fundamentally I think I’ve come to the realisation that my concept of Event Time is not the same as Flink.  So let me start again to explain what I’d like to do.


I’d like to group-by log records and sum up the metric values according to their timestamp – effectively a SQL group by where the timestamp of the event is aggegrated into 15 minute chunks.  I’d like to be able to do (eventually) run this same job for Batch & Live streaming.

 

I had thought be assigning the Log rows timestamp to BE the Event Time, that the Time Window aggregation would work. But now I wonder if what I should be doing is assigning a new field, called ‘time bracket’ perhaps, and add that as one of the Keys for the stream (I was keying by “identifierType:identifier”, so change that to “identifierType:identifier:timeBracket”.  This way the Fold function can still sum up the metrics by this groupBy key, and then rely on some periodic _actual time_ watermarking (ingestion or Processing).

 

I think I should disconnect the Flink time system from this aggregation level. I had thought that because I wanted to be able to apply this job to both a Live Streaming and a Batch processing (historical logs) that I needed to use the Event Time model..

 

That could be my fatal flaw.  Does that make sense?

 

Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, 16 March 2017 at 10:22 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

What kind of timestamp and watermark extractor are you using?

Can you share your implementation?

You can have a look at the example programs (for example [1]). These can be started and debugged inside an IDE by executing the main method.
If you run it in an external process, you should be able to connect to the process with the standard options.

Best, Fabian

 

2017-03-16 12:10 GMT+01:00 Paul Smith <[hidden email]>:

Thanks again for your reply. 

 

I've tried with both Parallel=1 through to 3. Same behavior. 

 

The log file is monotonically increasing time stamps generated through an application using log4j. Each log line is distinctly incrementing time stamps it is an 8GB file I'm using as a test case and has about 8 million rows. 

 

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink at the end and all looks correct - I set the folded results record time stamp to the end of each window and I see nice chunks of 15 minute blocks in the result. 

 

I'm not sure why the watermarks are not being sent as the data progresses. 

 

I might try pushing the data (same data) though Kafka and see if I get a different result. I might also take a sample through the file (rather than the whole file to see if I get differing results)

 

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  Can I easily remote attach to a running process via standard java options?

 

Regards

 

Paul 


On 16 Mar 2017, at 21:15, Fabian Hueske <[hidden email]> wrote:

Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must ensure that each parallel task is producing data.

If a source does not produce data, it will not increase the timestamps of its watermarks.

Another challenge, that you might run into is that you need to make sure that the file (or file chunks if it is split for parallel reading) is read in the increasing timestamp order.

Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with file input to get a feeling for the API.

I would try to set the parallelism of the file source to 1 to ensure that the data is read in the same order and that all tasks are producing data.

Hope this helps,

Fabian

 

2017-03-15 23:54 GMT+01:00 Paul Smith <[hidden email]>:

Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted at all until the end.  This stays all the way through the job.

 

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is being called, and returning the value I’d expect (The timestamp from the log line), and looks legitimate.  My WindowFunction which is doing the aggregation is not being called until right at the end of the job, but yet the windows comes out ok in the destination.

 

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to debug why the Windowing is not being triggered?  I can’t glean any useful further ideas from the metrics I can see..

 

Appreciate the help

 

Cheers,


Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.

Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.

If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.

When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

 

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Fabian Hueske-2
Actually, I think the program you shared in the first mail of this thread looks fine for the purpose you describe.

Timestamp and watermark assignment works as follows:
- For each records, a long timestamp is extracted (UNIX/epoch timestamp).
- A watermark is a timestamp which says that no more records with a timestamp lower than the watermark will be processed. Records which violate that condition are by default dropped.
- So the watermark should follow the max emitted record timestamp minus a safety margin.
- If you use a periodic watermark assigner, the watermark function is periodically called.

Best, Fabian


2017-03-16 22:54 GMT+01:00 Paul Smith <[hidden email]>:

I have managed to discover that my presumption of log4j log file being a _guaranteed_ sequential order of time is incorrect (race conditions).  So some logs are out of sequence, and I was getting _some_ Monotonic Timestamp violations.  I did not discover this because somehow my local flink was not outputting logs properly. (a restart of the service seemed to fix that..)

 

*HOWEVER*.. Once I switched to an “allow lateness” style timestamp assigner, I still get the same problem.


Fundamentally I think I’ve come to the realisation that my concept of Event Time is not the same as Flink.  So let me start again to explain what I’d like to do.


I’d like to group-by log records and sum up the metric values according to their timestamp – effectively a SQL group by where the timestamp of the event is aggegrated into 15 minute chunks.  I’d like to be able to do (eventually) run this same job for Batch & Live streaming.

 

I had thought be assigning the Log rows timestamp to BE the Event Time, that the Time Window aggregation would work. But now I wonder if what I should be doing is assigning a new field, called ‘time bracket’ perhaps, and add that as one of the Keys for the stream (I was keying by “identifierType:identifier”, so change that to “identifierType:identifier:timeBracket”.  This way the Fold function can still sum up the metrics by this groupBy key, and then rely on some periodic _actual time_ watermarking (ingestion or Processing).

 

I think I should disconnect the Flink time system from this aggregation level. I had thought that because I wanted to be able to apply this job to both a Live Streaming and a Batch processing (historical logs) that I needed to use the Event Time model..

 

That could be my fatal flaw.  Does that make sense?

 

Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, 16 March 2017 at 10:22 pm


To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

What kind of timestamp and watermark extractor are you using?

Can you share your implementation?

You can have a look at the example programs (for example [1]). These can be started and debugged inside an IDE by executing the main method.
If you run it in an external process, you should be able to connect to the process with the standard options.

Best, Fabian

 

2017-03-16 12:10 GMT+01:00 Paul Smith <[hidden email]>:

Thanks again for your reply. 

 

I've tried with both Parallel=1 through to 3. Same behavior. 

 

The log file is monotonically increasing time stamps generated through an application using log4j. Each log line is distinctly incrementing time stamps it is an 8GB file I'm using as a test case and has about 8 million rows. 

 

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink at the end and all looks correct - I set the folded results record time stamp to the end of each window and I see nice chunks of 15 minute blocks in the result. 

 

I'm not sure why the watermarks are not being sent as the data progresses. 

 

I might try pushing the data (same data) though Kafka and see if I get a different result. I might also take a sample through the file (rather than the whole file to see if I get differing results)

 

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  Can I easily remote attach to a running process via standard java options?

 

Regards

 

Paul 


On 16 Mar 2017, at 21:15, Fabian Hueske <[hidden email]> wrote:

Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must ensure that each parallel task is producing data.

If a source does not produce data, it will not increase the timestamps of its watermarks.

Another challenge, that you might run into is that you need to make sure that the file (or file chunks if it is split for parallel reading) is read in the increasing timestamp order.

Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with file input to get a feeling for the API.

I would try to set the parallelism of the file source to 1 to ensure that the data is read in the same order and that all tasks are producing data.

Hope this helps,

Fabian

 

2017-03-15 23:54 GMT+01:00 Paul Smith <[hidden email]>:

Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted at all until the end.  This stays all the way through the job.

 

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is being called, and returning the value I’d expect (The timestamp from the log line), and looks legitimate.  My WindowFunction which is doing the aggregation is not being called until right at the end of the job, but yet the windows comes out ok in the destination.

 

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to debug why the Windowing is not being triggered?  I can’t glean any useful further ideas from the metrics I can see..

 

Appreciate the help

 

Cheers,


Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.

Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.

If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.

When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

 

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 

 

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Paul Smith

Due to the slight out of sequence of the log timestamps, I tried switching to a “BoundedOutOfOrdernessTimestampExtractor” and used a minute as the threshold, but I still couldn’t get the watermarks to fire.  Setting break points and trying to follow the code, I Can’t see hwere the getCurrentWaterMark() is being called..?

Is that done via a periodic timer?  The autoWaterMarkInterval? (which is using the default) is that responsible as a periodic “poller” of what the water marks are and used to trigger things?   

 


Paul

 

 

 

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, 17 March 2017 at 9:11 am
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Actually, I think the program you shared in the first mail of this thread looks fine for the purpose you describe.

Timestamp and watermark assignment works as follows:

- For each records, a long timestamp is extracted (UNIX/epoch timestamp).

- A watermark is a timestamp which says that no more records with a timestamp lower than the watermark will be processed. Records which violate that condition are by default dropped.

- So the watermark should follow the max emitted record timestamp minus a safety margin.

- If you use a periodic watermark assigner, the watermark function is periodically called.

Best, Fabian

 

 

2017-03-16 22:54 GMT+01:00 Paul Smith <[hidden email]>:

I have managed to discover that my presumption of log4j log file being a _guaranteed_ sequential order of time is incorrect (race conditions).  So some logs are out of sequence, and I was getting _some_ Monotonic Timestamp violations.  I did not discover this because somehow my local flink was not outputting logs properly. (a restart of the service seemed to fix that..)

 

*HOWEVER*.. Once I switched to an “allow lateness” style timestamp assigner, I still get the same problem.


Fundamentally I think I’ve come to the realisation that my concept of Event Time is not the same as Flink.  So let me start again to explain what I’d like to do.


I’d like to group-by log records and sum up the metric values according to their timestamp – effectively a SQL group by where the timestamp of the event is aggegrated into 15 minute chunks.  I’d like to be able to do (eventually) run this same job for Batch & Live streaming.

 

I had thought be assigning the Log rows timestamp to BE the Event Time, that the Time Window aggregation would work. But now I wonder if what I should be doing is assigning a new field, called ‘time bracket’ perhaps, and add that as one of the Keys for the stream (I was keying by “identifierType:identifier”, so change that to “identifierType:identifier:timeBracket”.  This way the Fold function can still sum up the metrics by this groupBy key, and then rely on some periodic _actual time_ watermarking (ingestion or Processing).

 

I think I should disconnect the Flink time system from this aggregation level. I had thought that because I wanted to be able to apply this job to both a Live Streaming and a Batch processing (historical logs) that I needed to use the Event Time model..

 

That could be my fatal flaw.  Does that make sense?

 

Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, 16 March 2017 at 10:22 pm


To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

What kind of timestamp and watermark extractor are you using?

Can you share your implementation?

You can have a look at the example programs (for example [1]). These can be started and debugged inside an IDE by executing the main method.
If you run it in an external process, you should be able to connect to the process with the standard options.

Best, Fabian

 

2017-03-16 12:10 GMT+01:00 Paul Smith <[hidden email]>:

Thanks again for your reply. 

 

I've tried with both Parallel=1 through to 3. Same behavior. 

 

The log file is monotonically increasing time stamps generated through an application using log4j. Each log line is distinctly incrementing time stamps it is an 8GB file I'm using as a test case and has about 8 million rows. 

 

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink at the end and all looks correct - I set the folded results record time stamp to the end of each window and I see nice chunks of 15 minute blocks in the result. 

 

I'm not sure why the watermarks are not being sent as the data progresses. 

 

I might try pushing the data (same data) though Kafka and see if I get a different result. I might also take a sample through the file (rather than the whole file to see if I get differing results)

 

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  Can I easily remote attach to a running process via standard java options?

 

Regards

 

Paul 


On 16 Mar 2017, at 21:15, Fabian Hueske <[hidden email]> wrote:

Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must ensure that each parallel task is producing data.

If a source does not produce data, it will not increase the timestamps of its watermarks.

Another challenge, that you might run into is that you need to make sure that the file (or file chunks if it is split for parallel reading) is read in the increasing timestamp order.

Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with file input to get a feeling for the API.

I would try to set the parallelism of the file source to 1 to ensure that the data is read in the same order and that all tasks are producing data.

Hope this helps,

Fabian

 

2017-03-15 23:54 GMT+01:00 Paul Smith <[hidden email]>:

Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted at all until the end.  This stays all the way through the job.

 

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is being called, and returning the value I’d expect (The timestamp from the log line), and looks legitimate.  My WindowFunction which is doing the aggregation is not being called until right at the end of the job, but yet the windows comes out ok in the destination.

 

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to debug why the Windowing is not being triggered?  I can’t glean any useful further ideas from the metrics I can see..

 

Appreciate the help

 

Cheers,


Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.

Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.

If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.

When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

 

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith

 

 

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Batch stream Sink delay ?

Fabian Hueske-2
Yes, an AssignerWithPeriodicWatermarks is peridocially called. The interval is configured with StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval().
When you configure event-time mode (setStreamTimeCharacteristic(TimeCharacteristic.EventTime), the default is set to 200ms.

Best, Fabian

2017-03-17 0:14 GMT+01:00 Paul Smith <[hidden email]>:

Due to the slight out of sequence of the log timestamps, I tried switching to a “BoundedOutOfOrdernessTimestampExtractor” and used a minute as the threshold, but I still couldn’t get the watermarks to fire.  Setting break points and trying to follow the code, I Can’t see hwere the getCurrentWaterMark() is being called..?

Is that done via a periodic timer?  The autoWaterMarkInterval? (which is using the default) is that responsible as a periodic “poller” of what the water marks are and used to trigger things?   

 


Paul

 

 

 

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, 17 March 2017 at 9:11 am


To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Actually, I think the program you shared in the first mail of this thread looks fine for the purpose you describe.

Timestamp and watermark assignment works as follows:

- For each records, a long timestamp is extracted (UNIX/epoch timestamp).

- A watermark is a timestamp which says that no more records with a timestamp lower than the watermark will be processed. Records which violate that condition are by default dropped.

- So the watermark should follow the max emitted record timestamp minus a safety margin.

- If you use a periodic watermark assigner, the watermark function is periodically called.

Best, Fabian

 

 

2017-03-16 22:54 GMT+01:00 Paul Smith <[hidden email]>:

I have managed to discover that my presumption of log4j log file being a _guaranteed_ sequential order of time is incorrect (race conditions).  So some logs are out of sequence, and I was getting _some_ Monotonic Timestamp violations.  I did not discover this because somehow my local flink was not outputting logs properly. (a restart of the service seemed to fix that..)

 

*HOWEVER*.. Once I switched to an “allow lateness” style timestamp assigner, I still get the same problem.


Fundamentally I think I’ve come to the realisation that my concept of Event Time is not the same as Flink.  So let me start again to explain what I’d like to do.


I’d like to group-by log records and sum up the metric values according to their timestamp – effectively a SQL group by where the timestamp of the event is aggegrated into 15 minute chunks.  I’d like to be able to do (eventually) run this same job for Batch & Live streaming.

 

I had thought be assigning the Log rows timestamp to BE the Event Time, that the Time Window aggregation would work. But now I wonder if what I should be doing is assigning a new field, called ‘time bracket’ perhaps, and add that as one of the Keys for the stream (I was keying by “identifierType:identifier”, so change that to “identifierType:identifier:timeBracket”.  This way the Fold function can still sum up the metrics by this groupBy key, and then rely on some periodic _actual time_ watermarking (ingestion or Processing).

 

I think I should disconnect the Flink time system from this aggregation level. I had thought that because I wanted to be able to apply this job to both a Live Streaming and a Batch processing (historical logs) that I needed to use the Event Time model..

 

That could be my fatal flaw.  Does that make sense?

 

Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, 16 March 2017 at 10:22 pm


To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

What kind of timestamp and watermark extractor are you using?

Can you share your implementation?

You can have a look at the example programs (for example [1]). These can be started and debugged inside an IDE by executing the main method.
If you run it in an external process, you should be able to connect to the process with the standard options.

Best, Fabian

 

2017-03-16 12:10 GMT+01:00 Paul Smith <[hidden email]>:

Thanks again for your reply. 

 

I've tried with both Parallel=1 through to 3. Same behavior. 

 

The log file is monotonically increasing time stamps generated through an application using log4j. Each log line is distinctly incrementing time stamps it is an 8GB file I'm using as a test case and has about 8 million rows. 

 

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink at the end and all looks correct - I set the folded results record time stamp to the end of each window and I see nice chunks of 15 minute blocks in the result. 

 

I'm not sure why the watermarks are not being sent as the data progresses. 

 

I might try pushing the data (same data) though Kafka and see if I get a different result. I might also take a sample through the file (rather than the whole file to see if I get differing results)

 

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  Can I easily remote attach to a running process via standard java options?

 

Regards

 

Paul 


On 16 Mar 2017, at 21:15, Fabian Hueske <[hidden email]> wrote:

Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must ensure that each parallel task is producing data.

If a source does not produce data, it will not increase the timestamps of its watermarks.

Another challenge, that you might run into is that you need to make sure that the file (or file chunks if it is split for parallel reading) is read in the increasing timestamp order.

Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with file input to get a feeling for the API.

I would try to set the parallelism of the file source to 1 to ensure that the data is read in the same order and that all tasks are producing data.

Hope this helps,

Fabian

 

2017-03-15 23:54 GMT+01:00 Paul Smith <[hidden email]>:

Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted at all until the end.  This stays all the way through the job.

 

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is being called, and returning the value I’d expect (The timestamp from the log line), and looks legitimate.  My WindowFunction which is doing the aggregation is not being called until right at the end of the job, but yet the windows comes out ok in the destination.

 

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to debug why the Windowing is not being triggered?  I can’t glean any useful further ideas from the metrics I can see..

 

Appreciate the help

 

Cheers,


Paul

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "[hidden email]" <[hidden email]>
Subject: Re: Batch stream Sink delay ?

 

Hi Paul,

This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window.

Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps.

If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed.

When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job.

I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that.

Best, Fabian

 

2017-03-14 2:47 GMT+01:00 Paul Smith <[hidden email]>:

Using Flink 1.2.

 

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses a User Request log file that contains performance data per request. It accumulates metric data values into 15 minute time windows.  Each log line is mapped to multiple records, so that each of the metric value can be 'billed' against a few different categories (User, Organisation, Project, Action).  The goal of the Flink job is to distil the volume of request data into more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) and allow us to look for trends/outliers.

 

It is working very well as a batch, but with one unexpected behaviour.

 

What I can't understand is that the Sink does not appear to get _any_ records until the rest of the chain has completed over the entire file.  I've played around with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire previous chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?

 

You can see the base job here, with an example of a single log line with the metrics here:

 

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

 

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier' the value of that (a UserId for example).  I key the stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.  I have yet to work out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see how that changes the flow here.

 

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.  I have yet to attempt a live Streaming version of this, but had thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transitions into the next window.   Given the entire job takes about 5 minutes on my laptop for 8 million raw source lines whose data is spread out over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?  But nothing turns up until the entire job is done.

 

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain. ?

 

Any pointers would be appreciated in understanding the flow here.

 

Cheers,

 

Paul Smith