Executing a controllable benchmark in Flink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Executing a controllable benchmark in Flink

Felipe Gutierrez
Hi,

I am trying to benchmark a stream application in Flink. So, I am using
the source Function that reads events from the NYC Taxi Rides
(http://training.ververica.com/trainingData/nycTaxiRides.gz) and I
control the emission with System.nanoTime(). I am not using
Thread.sleep because Java does not guarantee the time that the thread
will be awakened.

public void busySleep() {
final long startTime = System.nanoTime();
while (System.nanoTime() - startTime < this.delayInNanoSeconds) ;
}

So, when I wait for 10000 nanoseconds I will get a workload of 100K
rec/sec. When I wait for 2000 nanoseconds I will get a workload of
500K rec/sec. For 1000 nanoseconds I will get a workload of 1M
rec/sec. And for 500 nanoseconds a workload of 2M rec/sec.

The problem that I am facing is that when I set the workload for 1M
rec/sec it seems that it is not generating at this rate. I guess it is
because it is consuming more time reading the TaxiRide file, or doing
IO operations, Or maybe it is some Java limitation.
If I use some message broker it will end up adding one more middleware
to have read/write IO operations and I guess it will be worst.
What do you recommend to do a controllable benchmark for stream processing?

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: Executing a controllable benchmark in Flink

rmetzger0
Hi Felipe,

the file is just 80 MBs. It is probably cached in the linux page cache, there should not be any disk IO involved.
So you are saying is that you can not further increase the throughput for sleeps shorter than 2000 nanoseconds.
Have you tried running this w/o any Sleep / nano.time syscalls? These syscalls can potentially be also expensive.
Running the source in a simple while loop should give you the theoretical maximum.

If you really want to generate data at a high speed, I would pre-generate some dataset on the heap (do not run any RNG, as it will waste CPU cycles) and emit that.

In general: VisualVM is your friend in understanding where you are loosing cycles.

Best,
Robert


On Thu, May 28, 2020 at 12:06 AM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I am trying to benchmark a stream application in Flink. So, I am using
the source Function that reads events from the NYC Taxi Rides
(http://training.ververica.com/trainingData/nycTaxiRides.gz) and I
control the emission with System.nanoTime(). I am not using
Thread.sleep because Java does not guarantee the time that the thread
will be awakened.

public void busySleep() {
final long startTime = System.nanoTime();
while (System.nanoTime() - startTime < this.delayInNanoSeconds) ;
}

So, when I wait for 10000 nanoseconds I will get a workload of 100K
rec/sec. When I wait for 2000 nanoseconds I will get a workload of
500K rec/sec. For 1000 nanoseconds I will get a workload of 1M
rec/sec. And for 500 nanoseconds a workload of 2M rec/sec.

The problem that I am facing is that when I set the workload for 1M
rec/sec it seems that it is not generating at this rate. I guess it is
because it is consuming more time reading the TaxiRide file, or doing
IO operations, Or maybe it is some Java limitation.
If I use some message broker it will end up adding one more middleware
to have read/write IO operations and I guess it will be worst.
What do you recommend to do a controllable benchmark for stream processing?

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: Executing a controllable benchmark in Flink

Felipe Gutierrez
I was losing something when because I was reading the line of the
GZIPInputStream outside of the busy while loop. I changed it and now I
am having more throughput. It is also a good idea to use VisualVM to
check if the throughput is correct and where I am losing more cycles.


while (reader.ready() && (line = reader.readLine()) != null) {
startTime = System.nanoTime();
taxiRide = TaxiRide.fromString(line);
sourceContext.collectWithTimestamp(taxiRide, getEventTime(taxiRide));
this.dataRateListener.busySleep(startTime);
}

public void busySleep(long startTime) {
long deadLine = startTime + this.delayInNanoSeconds;
while (System.nanoTime() < deadLine) ;
}

Thanks!

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 12:46 PM Robert Metzger <[hidden email]> wrote:

>
> Hi Felipe,
>
> the file is just 80 MBs. It is probably cached in the linux page cache, there should not be any disk IO involved.
> So you are saying is that you can not further increase the throughput for sleeps shorter than 2000 nanoseconds.
> Have you tried running this w/o any Sleep / nano.time syscalls? These syscalls can potentially be also expensive.
> Running the source in a simple while loop should give you the theoretical maximum.
>
> If you really want to generate data at a high speed, I would pre-generate some dataset on the heap (do not run any RNG, as it will waste CPU cycles) and emit that.
>
> In general: VisualVM is your friend in understanding where you are loosing cycles.
>
> Best,
> Robert
>
>
> On Thu, May 28, 2020 at 12:06 AM Felipe Gutierrez <[hidden email]> wrote:
>>
>> Hi,
>>
>> I am trying to benchmark a stream application in Flink. So, I am using
>> the source Function that reads events from the NYC Taxi Rides
>> (http://training.ververica.com/trainingData/nycTaxiRides.gz) and I
>> control the emission with System.nanoTime(). I am not using
>> Thread.sleep because Java does not guarantee the time that the thread
>> will be awakened.
>>
>> public void busySleep() {
>> final long startTime = System.nanoTime();
>> while (System.nanoTime() - startTime < this.delayInNanoSeconds) ;
>> }
>>
>> So, when I wait for 10000 nanoseconds I will get a workload of 100K
>> rec/sec. When I wait for 2000 nanoseconds I will get a workload of
>> 500K rec/sec. For 1000 nanoseconds I will get a workload of 1M
>> rec/sec. And for 500 nanoseconds a workload of 2M rec/sec.
>>
>> The problem that I am facing is that when I set the workload for 1M
>> rec/sec it seems that it is not generating at this rate. I guess it is
>> because it is consuming more time reading the TaxiRide file, or doing
>> IO operations, Or maybe it is some Java limitation.
>> If I use some message broker it will end up adding one more middleware
>> to have read/write IO operations and I guess it will be worst.
>> What do you recommend to do a controllable benchmark for stream processing?
>>
>> Thanks,
>> Felipe
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com