Posted by
Felipe Gutierrez on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Executing-a-controllable-benchmark-in-Flink-tp35525p35599.html
I was losing something when because I was reading the line of the
GZIPInputStream outside of the busy while loop. I changed it and now I
am having more throughput. It is also a good idea to use VisualVM to
check if the throughput is correct and where I am losing more cycles.
while (reader.ready() && (line = reader.readLine()) != null) {
startTime = System.nanoTime();
taxiRide = TaxiRide.fromString(line);
sourceContext.collectWithTimestamp(taxiRide, getEventTime(taxiRide));
this.dataRateListener.busySleep(startTime);
}
public void busySleep(long startTime) {
long deadLine = startTime + this.delayInNanoSeconds;
while (System.nanoTime() < deadLine) ;
}
Thanks!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
--
https://felipeogutierrez.blogspot.comOn Fri, May 29, 2020 at 12:46 PM Robert Metzger <
[hidden email]> wrote:
>
> Hi Felipe,
>
> the file is just 80 MBs. It is probably cached in the linux page cache, there should not be any disk IO involved.
> So you are saying is that you can not further increase the throughput for sleeps shorter than 2000 nanoseconds.
> Have you tried running this w/o any Sleep / nano.time syscalls? These syscalls can potentially be also expensive.
> Running the source in a simple while loop should give you the theoretical maximum.
>
> If you really want to generate data at a high speed, I would pre-generate some dataset on the heap (do not run any RNG, as it will waste CPU cycles) and emit that.
>
> In general: VisualVM is your friend in understanding where you are loosing cycles.
>
> Best,
> Robert
>
>
> On Thu, May 28, 2020 at 12:06 AM Felipe Gutierrez <
[hidden email]> wrote:
>>
>> Hi,
>>
>> I am trying to benchmark a stream application in Flink. So, I am using
>> the source Function that reads events from the NYC Taxi Rides
>> (
http://training.ververica.com/trainingData/nycTaxiRides.gz) and I
>> control the emission with System.nanoTime(). I am not using
>> Thread.sleep because Java does not guarantee the time that the thread
>> will be awakened.
>>
>> public void busySleep() {
>> final long startTime = System.nanoTime();
>> while (System.nanoTime() - startTime < this.delayInNanoSeconds) ;
>> }
>>
>> So, when I wait for 10000 nanoseconds I will get a workload of 100K
>> rec/sec. When I wait for 2000 nanoseconds I will get a workload of
>> 500K rec/sec. For 1000 nanoseconds I will get a workload of 1M
>> rec/sec. And for 500 nanoseconds a workload of 2M rec/sec.
>>
>> The problem that I am facing is that when I set the workload for 1M
>> rec/sec it seems that it is not generating at this rate. I guess it is
>> because it is consuming more time reading the TaxiRide file, or doing
>> IO operations, Or maybe it is some Java limitation.
>> If I use some message broker it will end up adding one more middleware
>> to have read/write IO operations and I guess it will be worst.
>> What do you recommend to do a controllable benchmark for stream processing?
>>
>> Thanks,
>> Felipe
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> --
https://felipeogutierrez.blogspot.com