Flink execution time benchmark

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink execution time benchmark

Giacomo Licari
Hi guys,
I'm trying to execute a benchmark reading a 1GB file and applying a reduce function.

I'd like to get the overall execution time.

I put that code before applying the reduce function:
long startTime = System.currentTimeMillis();

and after env.execute("Flink Processor"); I put the code to print out the overall time, but it doen't work.

Did you try something about?

Thanks a lot,
Giacomo
Reply | Threaded
Open this post in threaded view
|

Re: Flink execution time benchmark

Robert Metzger
Hi,

The execute() method returns an execution result object that also contains the runtime of the job.



Sent from my iPhone

> On 21.03.2015, at 11:09, Giacomo Licari <[hidden email]> wrote:
>
> Hi guys,
> I'm trying to execute a benchmark reading a 1GB file and applying a reduce function.
>
> I'd like to get the overall execution time.
>
> I put that code before applying the reduce function:
> long startTime = System.currentTimeMillis();
>
> and after env.execute("Flink Processor"); I put the code to print out the overall time, but it doen't work.
>
> Did you try something about?
>
> Thanks a lot,
> Giacomo
Reply | Threaded
Open this post in threaded view
|

Re: Flink execution time benchmark

Giacomo Licari
Hi Robert,
I can see the timestamp when the process starts but it seems doesn't stop executing.

I have:
        DataStream<Double> dataStream = env
                .readTextStream(filePath)             
                .flatMap(new Splitter());                 
               
        WindowedDataStream<Double> dataWindow = dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(5, TimeUnit.SECONDS));

        DataStream<Double> res = dataWindow.reduceGroup(new Reducer());
        env.execute("Flink Processor");

Thank you,
Giacomo

On Sat, Mar 21, 2015 at 1:19 PM, Robert Metzger <[hidden email]> wrote:
Hi,

The execute() method returns an execution result object that also contains the runtime of the job.



Sent from my iPhone

> On 21.03.2015, at 11:09, Giacomo Licari <[hidden email]> wrote:
>
> Hi guys,
> I'm trying to execute a benchmark reading a 1GB file and applying a reduce function.
>
> I'd like to get the overall execution time.
>
> I put that code before applying the reduce function:
> long startTime = System.currentTimeMillis();
>
> and after env.execute("Flink Processor"); I put the code to print out the overall time, but it doen't work.
>
> Did you try something about?
>
> Thanks a lot,
> Giacomo

Reply | Threaded
Open this post in threaded view
|

Re: Flink execution time benchmark

Márton Balassi
Hi Giacomo,

You are currently using the Flink Streaming API. Is that your intention or would you like to measure batch execution?

Regarding your code: StreamExecutionEnvironment.readTextStream(filePath) monitors a file/directory and streams the updates to that location [1] - potentially indefinitely, so that job is not expected to stop.

If you wanted to read a text file with the streaming API you would need to use the StreamExecutionEnvironment.readTextFile(filePath) function, the same for the batch API is ExecutionEnvironment.readTextFile(filePath). 

In case you wanted to measure the performance of the streaming framework I personally would not necessarily recommend reading from a file as you benchmark will be potentially disk I/O bounded. Reading from a local socket or even generating data in a SourceFunction might be more beneficial. For the batch version the file is of course a standard input.

After a pull request that I have just merged both the batch and the streaming APIs env.execute methods return a JobExecutionResult from which you can get the execution time as Robert suggested. [1] To get that please depend on the latest master, 0.9-SNAPSHOT. In case you are testing with an older stable version you can measure time from the command line:

time (bin/flink run your-jar.jar ... )


Best,

Marton 

On Sun, Mar 22, 2015 at 5:56 PM, Giacomo Licari <[hidden email]> wrote:
Hi Robert,
I can see the timestamp when the process starts but it seems doesn't stop executing.

I have:
        DataStream<Double> dataStream = env
                .readTextStream(filePath)             
                .flatMap(new Splitter());                 
               
        WindowedDataStream<Double> dataWindow = dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(5, TimeUnit.SECONDS));

        DataStream<Double> res = dataWindow.reduceGroup(new Reducer());
        env.execute("Flink Processor");

Thank you,
Giacomo

On Sat, Mar 21, 2015 at 1:19 PM, Robert Metzger <[hidden email]> wrote:
Hi,

The execute() method returns an execution result object that also contains the runtime of the job.



Sent from my iPhone

> On 21.03.2015, at 11:09, Giacomo Licari <[hidden email]> wrote:
>
> Hi guys,
> I'm trying to execute a benchmark reading a 1GB file and applying a reduce function.
>
> I'd like to get the overall execution time.
>
> I put that code before applying the reduce function:
> long startTime = System.currentTimeMillis();
>
> and after env.execute("Flink Processor"); I put the code to print out the overall time, but it doen't work.
>
> Did you try something about?
>
> Thanks a lot,
> Giacomo


Reply | Threaded
Open this post in threaded view
|

Re: Flink execution time benchmark

Saleh
Hi everyone,

I am using Flink Streaming API and try to measure the execution time for a word count job. I have tried a method mentioned on Robert and Marton posts (using JobExecutionResult object) as shown below, but I can not figure out how to find the results as any code after env.execute() does not get executed.

                JobExecutionResult execResult = env.execute();
                System.out.println("Execution time = " + execResult.getNetRuntime());


I have also tried to use "StreamExecutionEnvironment.execute("flinkJob"). The API states that the program execution will be logged and displayed with the provided name "flinkJob" in this case. My question is where can I locate this log file. I am running the Flink jobs on eclipse with maven plugins suing just my local machine.

Cheers.