Re: Logs meaning states

Posted by Stephan Ewen on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Logs-meaning-states-tp1829p1837.html

Ah, thank you! 

 - If you create a data set from a Java/Scala collection, this data source has the parallelism one.
 - The map function is chained to that source, so it runs with parallelism one as well.
 - To run it with a higher parallelism, use "setParallelism(...)" on the mapFunction, or call "redistribute()" after the source.
 - The collect collects in parallel, but ultimately has a non-parallel code path, when results are concatenated and transferred to the client.


BTW: The execution plan and the web frontend are helpful to diagnose such things.


On Mon, Jun 29, 2015 at 4:14 PM, Juan Fumero <[hidden email]> wrote:
Yeah, sorry. I would like to do something simple like this, but using Java Threads.

DataSet<Tuple2<Integer, Integer>> input = env.fromCollection(in);
DataSet<Integer> output = input.map(new HighWorkLoad());
ArrayList<Integer> result = output.consume(); // ?  like collect but in parallel, some operation that consumes the pipeline.
return result;

Cheers
Juan



On Mon, 2015-06-29 at 16:04 +0200, Stephan Ewen wrote:
It is not quite easy to understand what you are trying to do.


Can you post your program here? Then we can take a look and give you a good answer...

On Mon, Jun 29, 2015 at 3:47 PM, Juan Fumero <[hidden email]> wrote:
Is there any other way to apply the function in parallel and return the
result to the client in parallel?

Thanks
Juan

On Mon, 2015-06-29 at 15:01 +0200, Stephan Ewen wrote:
> In general, avoid collect if you can. Collect brings data top the
> client, where the computation is not parallel any more.
>
>
> Try to do as much on the DataSet as possible.
>
> On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero
> <[hidden email]> wrote:
>         Hi Stephan,
>           so should I use another method instead of collect? It seems
>         multithread is not working with this.
>
>
>         Juan
>
>         On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:
>         > Hi Juan!
>         >
>         >
>         > This is an artifact of a workaround right now. The actual
>         collect()
>         > logic happens in the flatMap() and the sink is a dummy that
>         executes
>         > nothing. The flatMap writes the data to be collected to the
>         > "accumulator" that delivers it back.
>         >
>         >
>         > Greetings,
>         > Stephan
>         >
>         >
>         >
>         > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
>         > <[hidden email]> wrote:
>         >         Hi,
>         >           I am starting with Flink. I have tried to look for
>         the
>         >         documentation but I havent found it clear.
>         >
>         >         I wonder the difference between these two states:
>         >
>         >         FlatMap RUNNING vs DataSink RUNNIG.
>         >
>         >         FlatMap is doing data any data transformation?
>         Compilation? In
>         >         which point is actually executing the function
>         provided in the
>         >         MapFunction? How could I know exactly the time for
>         the kernel
>         >         computation?
>         >
>         >         It seems is using one thread in this step, even
>         though I
>         >         specified 16 threads in the createLocalEnvironment.
>         >
>         >         CHAIN DataSource (at
>         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >
>         >         Here is running only one thread for almost 35
>         seconds.
>         >
>         >         The rest of the execution is very fast (less than
>         one second
>         >         for computing the square of an array of 500000
>         integer
>         >         elements)
>         >
>         >         Thanks
>         >         Juan
>         >
>         >         Here the full log.
>         >
>         >         06/29/2015 14:13:25 Job execution switched to status
>         RUNNING.
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to SCHEDULED
>         >         06/29/2015 14:13:25 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to DEPLOYING
>         >         06/29/2015 14:13:26 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         SCHEDULED
>         >         06/29/2015 14:14:01 CHAIN DataSource (at
>         >         applyFunction(ApplyFunction.java:96)
>         >
>          (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
>         >         (Map at applyFunction(ApplyFunction.java:108)) ->
>         FlatMap
>         >         (collect())(1/1) switched to FINISHED
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         DEPLOYING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         RUNNING
>         >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1)
>         switched to
>         >         FINISHED
>         >         06/29/2015 14:14:01 Job execution switched to status
>         FINISHED.
>         >
>         >
>         >
>         >
>         >
>         >
>
>
>
>
>