Logs meaning states

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Logs meaning states

Juan Fumero
Hi,
  I am starting with Flink. I have tried to look for the documentation but I havent found it clear. 

I wonder the difference between these two states:

FlatMap RUNNING vs DataSink RUNNIG.

FlatMap is doing data any data transformation? Compilation? In which point is actually executing the function provided in the MapFunction? How could I know exactly the time for the kernel computation?

It seems is using one thread in this step, even though I specified 16 threads in the createLocalEnvironment.

CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to RUNNING

Here is running only one thread for almost 35 seconds.

The rest of the execution is very fast (less than one second for computing the square of an array of 500000 integer  elements)

Thanks
Juan

Here the full log.

06/29/2015 14:13:25 Job execution switched to status RUNNING.
06/29/2015 14:13:25 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to SCHEDULED
06/29/2015 14:13:25 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to DEPLOYING
06/29/2015 14:13:26 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to RUNNING
06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to SCHEDULED
06/29/2015 14:14:01 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to FINISHED
06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to DEPLOYING
06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to RUNNING
06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to FINISHED
06/29/2015 14:14:01 Job execution switched to status FINISHED.



Reply | Threaded
Open this post in threaded view
|

Re: Logs meaning states

Stephan Ewen
Hi Juan!

This is an artifact of a workaround right now. The actual collect() logic happens in the flatMap() and the sink is a dummy that executes nothing. The flatMap writes the data to be collected to the "accumulator" that delivers it back.

Greetings,
Stephan


On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero <[hidden email]> wrote:
Hi,
  I am starting with Flink. I have tried to look for the documentation but I havent found it clear. 

I wonder the difference between these two states:

FlatMap RUNNING vs DataSink RUNNIG.

FlatMap is doing data any data transformation? Compilation? In which point is actually executing the function provided in the MapFunction? How could I know exactly the time for the kernel computation?

It seems is using one thread in this step, even though I specified 16 threads in the createLocalEnvironment.

CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to RUNNING

Here is running only one thread for almost 35 seconds.

The rest of the execution is very fast (less than one second for computing the square of an array of 500000 integer  elements)

Thanks
Juan

Here the full log.

06/29/2015 14:13:25 Job execution switched to status RUNNING.
06/29/2015 14:13:25 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to SCHEDULED
06/29/2015 14:13:25 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to DEPLOYING
06/29/2015 14:13:26 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to RUNNING
06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to SCHEDULED
06/29/2015 14:14:01 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to FINISHED
06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to DEPLOYING
06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to RUNNING
06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to FINISHED
06/29/2015 14:14:01 Job execution switched to status FINISHED.




Reply | Threaded
Open this post in threaded view
|

Re: Logs meaning states

Juan Fumero
Hi Stephan,
  so should I use another method instead of collect? It seems
multithread is not working with this.
 

Juan

On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:

> Hi Juan!
>
>
> This is an artifact of a workaround right now. The actual collect()
> logic happens in the flatMap() and the sink is a dummy that executes
> nothing. The flatMap writes the data to be collected to the
> "accumulator" that delivers it back.
>
>
> Greetings,
> Stephan
>
>
>
> On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
> <[hidden email]> wrote:
>         Hi,
>           I am starting with Flink. I have tried to look for the
>         documentation but I havent found it clear.  
>        
>         I wonder the difference between these two states:
>        
>         FlatMap RUNNING vs DataSink RUNNIG.
>        
>         FlatMap is doing data any data transformation? Compilation? In
>         which point is actually executing the function provided in the
>         MapFunction? How could I know exactly the time for the kernel
>         computation?
>        
>         It seems is using one thread in this step, even though I
>         specified 16 threads in the createLocalEnvironment.
>        
>         CHAIN DataSource (at applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to RUNNING
>        
>         Here is running only one thread for almost 35 seconds.
>        
>         The rest of the execution is very fast (less than one second
>         for computing the square of an array of 500000 integer
>         elements)
>        
>         Thanks
>         Juan
>        
>         Here the full log.
>        
>         06/29/2015 14:13:25 Job execution switched to status RUNNING.
>         06/29/2015 14:13:25 CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to SCHEDULED
>         06/29/2015 14:13:25 CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to DEPLOYING
>         06/29/2015 14:13:26 CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to RUNNING
>         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
>         SCHEDULED
>         06/29/2015 14:14:01 CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to FINISHED
>         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
>         DEPLOYING
>         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
>         RUNNING
>         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
>         FINISHED
>         06/29/2015 14:14:01 Job execution switched to status FINISHED.
>        
>        
>        
>        
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Logs meaning states

Stephan Ewen
In general, avoid collect if you can. Collect brings data top the client, where the computation is not parallel any more.

Try to do as much on the DataSet as possible.

On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero <[hidden email]> wrote:
Hi Stephan,
  so should I use another method instead of collect? It seems
multithread is not working with this.


Juan

On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:
> Hi Juan!
>
>
> This is an artifact of a workaround right now. The actual collect()
> logic happens in the flatMap() and the sink is a dummy that executes
> nothing. The flatMap writes the data to be collected to the
> "accumulator" that delivers it back.
>
>
> Greetings,
> Stephan
>
>
>
> On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
> <[hidden email]> wrote:
>         Hi,
>           I am starting with Flink. I have tried to look for the
>         documentation but I havent found it clear.
>
>         I wonder the difference between these two states:
>
>         FlatMap RUNNING vs DataSink RUNNIG.
>
>         FlatMap is doing data any data transformation? Compilation? In
>         which point is actually executing the function provided in the
>         MapFunction? How could I know exactly the time for the kernel
>         computation?
>
>         It seems is using one thread in this step, even though I
>         specified 16 threads in the createLocalEnvironment.
>
>         CHAIN DataSource (at applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to RUNNING
>
>         Here is running only one thread for almost 35 seconds.
>
>         The rest of the execution is very fast (less than one second
>         for computing the square of an array of 500000 integer
>         elements)
>
>         Thanks
>         Juan
>
>         Here the full log.
>
>         06/29/2015 14:13:25 Job execution switched to status RUNNING.
>         06/29/2015 14:13:25 CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to SCHEDULED
>         06/29/2015 14:13:25 CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to DEPLOYING
>         06/29/2015 14:13:26 CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to RUNNING
>         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
>         SCHEDULED
>         06/29/2015 14:14:01 CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
>         (collect())(1/1) switched to FINISHED
>         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
>         DEPLOYING
>         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
>         RUNNING
>         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
>         FINISHED
>         06/29/2015 14:14:01 Job execution switched to status FINISHED.
>
>
>
>
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Logs meaning states

Juan Fumero
Is there any other way to apply the function in parallel and return the
result to the client in parallel?

Thanks
Juan

On Mon, 2015-06-29 at 15:01 +0200, Stephan Ewen wrote:

> In general, avoid collect if you can. Collect brings data top the
> client, where the computation is not parallel any more.
>
>
> Try to do as much on the DataSet as possible.
>
> On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero
> <[hidden email]> wrote:
>         Hi Stephan,
>           so should I use another method instead of collect? It seems
>         multithread is not working with this.
>        
>        
>         Juan
>        
>         On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:
>         > Hi Juan!
>         >
>         >
>         > This is an artifact of a workaround right now. The actual
>         collect()
>         > logic happens in the flatMap() and the sink is a dummy that
>         executes
>         > nothing. The flatMap writes the data to be collected to the
>         > "accumulator" that delivers it back.
>         >
>         >
>         > Greetings,
>         > Stephan
>         >
>         >
>         >
>         > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
>         > <[hidden email]> wrote:
>         >         Hi,
>         >           I am starting with Flink. I have tried to look for
>         the
>         >         documentation but I havent found it clear.
>         >
>         >         I wonder the difference between these two states:
>         >
>         >         FlatMap RUNNING vs DataSink RUNNIG.
>         >
>         >         FlatMap is doing data any data transformation?
>         Compilation? In
>         >         which point is actually executing the function
>         provided in the
>         >         MapFunction? How could I know exactly the time for
>         the kernel
>         >         computation?
>         >
>         >         It seems is using one thread in this step, even
>         though I
>         >         specified 16 threads in the createLocalEnvironment.
>         >
>         >         CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >
>         >         Here is running only one thread for almost 35
>         seconds.
>         >
>         >         The rest of the execution is very fast (less than
>         one second
>         >         for computing the square of an array of 500000
>         integer
>         >         elements)
>         >
>         >         Thanks
>         >         Juan
>         >
>         >         Here the full log.
>         >
>         >         06/29/2015 14:13:25 Job execution switched to status
>         RUNNING.
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to SCHEDULED
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to DEPLOYING
>         >         06/29/2015 14:13:26 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         SCHEDULED
>         >         06/29/2015 14:14:01 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to FINISHED
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         DEPLOYING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         FINISHED
>         >         06/29/2015 14:14:01 Job execution switched to status
>         FINISHED.
>         >
>         >
>         >
>         >
>         >
>         >
>        
>        
>        
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Logs meaning states

Stephan Ewen
It is not quite easy to understand what you are trying to do.

Can you post your program here? Then we can take a look and give you a good answer...

On Mon, Jun 29, 2015 at 3:47 PM, Juan Fumero <[hidden email]> wrote:
Is there any other way to apply the function in parallel and return the
result to the client in parallel?

Thanks
Juan

On Mon, 2015-06-29 at 15:01 +0200, Stephan Ewen wrote:
> In general, avoid collect if you can. Collect brings data top the
> client, where the computation is not parallel any more.
>
>
> Try to do as much on the DataSet as possible.
>
> On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero
> <[hidden email]> wrote:
>         Hi Stephan,
>           so should I use another method instead of collect? It seems
>         multithread is not working with this.
>
>
>         Juan
>
>         On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:
>         > Hi Juan!
>         >
>         >
>         > This is an artifact of a workaround right now. The actual
>         collect()
>         > logic happens in the flatMap() and the sink is a dummy that
>         executes
>         > nothing. The flatMap writes the data to be collected to the
>         > "accumulator" that delivers it back.
>         >
>         >
>         > Greetings,
>         > Stephan
>         >
>         >
>         >
>         > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
>         > <[hidden email]> wrote:
>         >         Hi,
>         >           I am starting with Flink. I have tried to look for
>         the
>         >         documentation but I havent found it clear.
>         >
>         >         I wonder the difference between these two states:
>         >
>         >         FlatMap RUNNING vs DataSink RUNNIG.
>         >
>         >         FlatMap is doing data any data transformation?
>         Compilation? In
>         >         which point is actually executing the function
>         provided in the
>         >         MapFunction? How could I know exactly the time for
>         the kernel
>         >         computation?
>         >
>         >         It seems is using one thread in this step, even
>         though I
>         >         specified 16 threads in the createLocalEnvironment.
>         >
>         >         CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >
>         >         Here is running only one thread for almost 35
>         seconds.
>         >
>         >         The rest of the execution is very fast (less than
>         one second
>         >         for computing the square of an array of 500000
>         integer
>         >         elements)
>         >
>         >         Thanks
>         >         Juan
>         >
>         >         Here the full log.
>         >
>         >         06/29/2015 14:13:25 Job execution switched to status
>         RUNNING.
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to SCHEDULED
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to DEPLOYING
>         >         06/29/2015 14:13:26 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         SCHEDULED
>         >         06/29/2015 14:14:01 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to FINISHED
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         DEPLOYING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         FINISHED
>         >         06/29/2015 14:14:01 Job execution switched to status
>         FINISHED.
>         >
>         >
>         >
>         >
>         >
>         >
>
>
>
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Logs meaning states

Juan Fumero
Yeah, sorry. I would like to do something simple like this, but using Java Threads.

DataSet<Tuple2<Integer, Integer>> input = env.fromCollection(in);
DataSet<Integer> output = input.map(new HighWorkLoad());
ArrayList<Integer> result = output.consume(); // ?  like collect but in parallel, some operation that consumes the pipeline.
return result;

Cheers
Juan


On Mon, 2015-06-29 at 16:04 +0200, Stephan Ewen wrote:
It is not quite easy to understand what you are trying to do.


Can you post your program here? Then we can take a look and give you a good answer...

On Mon, Jun 29, 2015 at 3:47 PM, Juan Fumero <[hidden email]> wrote:
Is there any other way to apply the function in parallel and return the
result to the client in parallel?

Thanks
Juan

On Mon, 2015-06-29 at 15:01 +0200, Stephan Ewen wrote:
> In general, avoid collect if you can. Collect brings data top the
> client, where the computation is not parallel any more.
>
>
> Try to do as much on the DataSet as possible.
>
> On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero
> <[hidden email]> wrote:
>         Hi Stephan,
>           so should I use another method instead of collect? It seems
>         multithread is not working with this.
>
>
>         Juan
>
>         On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:
>         > Hi Juan!
>         >
>         >
>         > This is an artifact of a workaround right now. The actual
>         collect()
>         > logic happens in the flatMap() and the sink is a dummy that
>         executes
>         > nothing. The flatMap writes the data to be collected to the
>         > "accumulator" that delivers it back.
>         >
>         >
>         > Greetings,
>         > Stephan
>         >
>         >
>         >
>         > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
>         > <[hidden email]> wrote:
>         >         Hi,
>         >           I am starting with Flink. I have tried to look for
>         the
>         >         documentation but I havent found it clear.
>         >
>         >         I wonder the difference between these two states:
>         >
>         >         FlatMap RUNNING vs DataSink RUNNIG.
>         >
>         >         FlatMap is doing data any data transformation?
>         Compilation? In
>         >         which point is actually executing the function
>         provided in the
>         >         MapFunction? How could I know exactly the time for
>         the kernel
>         >         computation?
>         >
>         >         It seems is using one thread in this step, even
>         though I
>         >         specified 16 threads in the createLocalEnvironment.
>         >
>         >         CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >
>         >         Here is running only one thread for almost 35
>         seconds.
>         >
>         >         The rest of the execution is very fast (less than
>         one second
>         >         for computing the square of an array of 500000
>         integer
>         >         elements)
>         >
>         >         Thanks
>         >         Juan
>         >
>         >         Here the full log.
>         >
>         >         06/29/2015 14:13:25 Job execution switched to status
>         RUNNING.
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to SCHEDULED
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to DEPLOYING
>         >         06/29/2015 14:13:26 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         SCHEDULED
>         >         06/29/2015 14:14:01 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to FINISHED
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         DEPLOYING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         FINISHED
>         >         06/29/2015 14:14:01 Job execution switched to status
>         FINISHED.
>         >
>         >
>         >
>         >
>         >
>         >
>
>
>
>
>






Reply | Threaded
Open this post in threaded view
|

Re: Logs meaning states

Stephan Ewen
Ah, thank you! 

 - If you create a data set from a Java/Scala collection, this data source has the parallelism one.
 - The map function is chained to that source, so it runs with parallelism one as well.
 - To run it with a higher parallelism, use "setParallelism(...)" on the mapFunction, or call "redistribute()" after the source.
 - The collect collects in parallel, but ultimately has a non-parallel code path, when results are concatenated and transferred to the client.


BTW: The execution plan and the web frontend are helpful to diagnose such things.


On Mon, Jun 29, 2015 at 4:14 PM, Juan Fumero <[hidden email]> wrote:
Yeah, sorry. I would like to do something simple like this, but using Java Threads.

DataSet<Tuple2<Integer, Integer>> input = env.fromCollection(in);
DataSet<Integer> output = input.map(new HighWorkLoad());
ArrayList<Integer> result = output.consume(); // ?  like collect but in parallel, some operation that consumes the pipeline.
return result;

Cheers
Juan



On Mon, 2015-06-29 at 16:04 +0200, Stephan Ewen wrote:
It is not quite easy to understand what you are trying to do.


Can you post your program here? Then we can take a look and give you a good answer...

On Mon, Jun 29, 2015 at 3:47 PM, Juan Fumero <[hidden email]> wrote:
Is there any other way to apply the function in parallel and return the
result to the client in parallel?

Thanks
Juan

On Mon, 2015-06-29 at 15:01 +0200, Stephan Ewen wrote:
> In general, avoid collect if you can. Collect brings data top the
> client, where the computation is not parallel any more.
>
>
> Try to do as much on the DataSet as possible.
>
> On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero
> <[hidden email]> wrote:
>         Hi Stephan,
>           so should I use another method instead of collect? It seems
>         multithread is not working with this.
>
>
>         Juan
>
>         On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:
>         > Hi Juan!
>         >
>         >
>         > This is an artifact of a workaround right now. The actual
>         collect()
>         > logic happens in the flatMap() and the sink is a dummy that
>         executes
>         > nothing. The flatMap writes the data to be collected to the
>         > "accumulator" that delivers it back.
>         >
>         >
>         > Greetings,
>         > Stephan
>         >
>         >
>         >
>         > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
>         > <[hidden email]> wrote:
>         >         Hi,
>         >           I am starting with Flink. I have tried to look for
>         the
>         >         documentation but I havent found it clear.
>         >
>         >         I wonder the difference between these two states:
>         >
>         >         FlatMap RUNNING vs DataSink RUNNIG.
>         >
>         >         FlatMap is doing data any data transformation?
>         Compilation? In
>         >         which point is actually executing the function
>         provided in the
>         >         MapFunction? How could I know exactly the time for
>         the kernel
>         >         computation?
>         >
>         >         It seems is using one thread in this step, even
>         though I
>         >         specified 16 threads in the createLocalEnvironment.
>         >
>         >         CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >
>         >         Here is running only one thread for almost 35
>         seconds.
>         >
>         >         The rest of the execution is very fast (less than
>         one second
>         >         for computing the square of an array of 500000
>         integer
>         >         elements)
>         >
>         >         Thanks
>         >         Juan
>         >
>         >         Here the full log.
>         >
>         >         06/29/2015 14:13:25 Job execution switched to status
>         RUNNING.
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to SCHEDULED
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to DEPLOYING
>         >         06/29/2015 14:13:26 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         SCHEDULED
>         >         06/29/2015 14:14:01 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to FINISHED
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         DEPLOYING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         FINISHED
>         >         06/29/2015 14:14:01 Job execution switched to status
>         FINISHED.
>         >
>         >
>         >
>         >
>         >
>         >
>
>
>
>
>