Hi,
I am starting with Flink. I have tried to look for the documentation but I havent found it clear. I wonder the difference between these two states: FlatMap RUNNING vs DataSink RUNNIG. FlatMap is doing data any data transformation? Compilation? In which point is actually executing the function provided in the MapFunction? How could I know exactly the time for the kernel computation? It seems is using one thread in this step, even though I specified 16 threads in the createLocalEnvironment. CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to RUNNING Here is running only one thread for almost 35 seconds. The rest of the execution is very fast (less than one second for computing the square of an array of 500000 integer elements) Thanks Juan Here the full log. 06/29/2015 14:13:25 Job execution switched to status RUNNING. 06/29/2015 14:13:25 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to SCHEDULED 06/29/2015 14:13:25 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to DEPLOYING 06/29/2015 14:13:26 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to RUNNING 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to SCHEDULED 06/29/2015 14:14:01 CHAIN DataSource (at applyFunction(ApplyFunction.java:96) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched to FINISHED 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to DEPLOYING 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to RUNNING 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to FINISHED 06/29/2015 14:14:01 Job execution switched to status FINISHED. |
Hi Juan! This is an artifact of a workaround right now. The actual collect() logic happens in the flatMap() and the sink is a dummy that executes nothing. The flatMap writes the data to be collected to the "accumulator" that delivers it back. Greetings, Stephan On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero <[hidden email]> wrote:
|
Hi Stephan,
so should I use another method instead of collect? It seems multithread is not working with this. Juan On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote: > Hi Juan! > > > This is an artifact of a workaround right now. The actual collect() > logic happens in the flatMap() and the sink is a dummy that executes > nothing. The flatMap writes the data to be collected to the > "accumulator" that delivers it back. > > > Greetings, > Stephan > > > > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero > <[hidden email]> wrote: > Hi, > I am starting with Flink. I have tried to look for the > documentation but I havent found it clear. > > I wonder the difference between these two states: > > FlatMap RUNNING vs DataSink RUNNIG. > > FlatMap is doing data any data transformation? Compilation? In > which point is actually executing the function provided in the > MapFunction? How could I know exactly the time for the kernel > computation? > > It seems is using one thread in this step, even though I > specified 16 threads in the createLocalEnvironment. > > CHAIN DataSource (at applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > (collect())(1/1) switched to RUNNING > > Here is running only one thread for almost 35 seconds. > > The rest of the execution is very fast (less than one second > for computing the square of an array of 500000 integer > elements) > > Thanks > Juan > > Here the full log. > > 06/29/2015 14:13:25 Job execution switched to status RUNNING. > 06/29/2015 14:13:25 CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > (collect())(1/1) switched to SCHEDULED > 06/29/2015 14:13:25 CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > (collect())(1/1) switched to DEPLOYING > 06/29/2015 14:13:26 CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > (collect())(1/1) switched to RUNNING > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to > SCHEDULED > 06/29/2015 14:14:01 CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > (collect())(1/1) switched to FINISHED > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to > DEPLOYING > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to > RUNNING > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to > FINISHED > 06/29/2015 14:14:01 Job execution switched to status FINISHED. > > > > > > |
In general, avoid collect if you can. Collect brings data top the client, where the computation is not parallel any more. Try to do as much on the DataSet as possible. On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero <[hidden email]> wrote: Hi Stephan, |
Is there any other way to apply the function in parallel and return the
result to the client in parallel? Thanks Juan On Mon, 2015-06-29 at 15:01 +0200, Stephan Ewen wrote: > In general, avoid collect if you can. Collect brings data top the > client, where the computation is not parallel any more. > > > Try to do as much on the DataSet as possible. > > On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero > <[hidden email]> wrote: > Hi Stephan, > so should I use another method instead of collect? It seems > multithread is not working with this. > > > Juan > > On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote: > > Hi Juan! > > > > > > This is an artifact of a workaround right now. The actual > collect() > > logic happens in the flatMap() and the sink is a dummy that > executes > > nothing. The flatMap writes the data to be collected to the > > "accumulator" that delivers it back. > > > > > > Greetings, > > Stephan > > > > > > > > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero > > <[hidden email]> wrote: > > Hi, > > I am starting with Flink. I have tried to look for > the > > documentation but I havent found it clear. > > > > I wonder the difference between these two states: > > > > FlatMap RUNNING vs DataSink RUNNIG. > > > > FlatMap is doing data any data transformation? > Compilation? In > > which point is actually executing the function > provided in the > > MapFunction? How could I know exactly the time for > the kernel > > computation? > > > > It seems is using one thread in this step, even > though I > > specified 16 threads in the createLocalEnvironment. > > > > CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> > FlatMap > > (collect())(1/1) switched to RUNNING > > > > Here is running only one thread for almost 35 > seconds. > > > > The rest of the execution is very fast (less than > one second > > for computing the square of an array of 500000 > integer > > elements) > > > > Thanks > > Juan > > > > Here the full log. > > > > 06/29/2015 14:13:25 Job execution switched to status > RUNNING. > > 06/29/2015 14:13:25 CHAIN DataSource (at > > applyFunction(ApplyFunction.java:96) > > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> > FlatMap > > (collect())(1/1) switched to SCHEDULED > > 06/29/2015 14:13:25 CHAIN DataSource (at > > applyFunction(ApplyFunction.java:96) > > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> > FlatMap > > (collect())(1/1) switched to DEPLOYING > > 06/29/2015 14:13:26 CHAIN DataSource (at > > applyFunction(ApplyFunction.java:96) > > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> > FlatMap > > (collect())(1/1) switched to RUNNING > > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) > switched to > > SCHEDULED > > 06/29/2015 14:14:01 CHAIN DataSource (at > > applyFunction(ApplyFunction.java:96) > > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> > FlatMap > > (collect())(1/1) switched to FINISHED > > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) > switched to > > DEPLOYING > > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) > switched to > > RUNNING > > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) > switched to > > FINISHED > > 06/29/2015 14:14:01 Job execution switched to status > FINISHED. > > > > > > > > > > > > > > > > > |
It is not quite easy to understand what you are trying to do. Can you post your program here? Then we can take a look and give you a good answer... On Mon, Jun 29, 2015 at 3:47 PM, Juan Fumero <[hidden email]> wrote: Is there any other way to apply the function in parallel and return the |
Yeah, sorry. I would like to do something simple like this, but using Java Threads.
DataSet<Tuple2<Integer, Integer>> input = env.fromCollection(in); DataSet<Integer> output = input.map(new HighWorkLoad()); ArrayList<Integer> result = output.consume(); // ? like collect but in parallel, some operation that consumes the pipeline. return result; Cheers Juan On Mon, 2015-06-29 at 16:04 +0200, Stephan Ewen wrote: It is not quite easy to understand what you are trying to do. Can you post your program here? Then we can take a look and give you a good answer... On Mon, Jun 29, 2015 at 3:47 PM, Juan Fumero <[hidden email]> wrote:
|
Ah, thank you! - If you create a data set from a Java/Scala collection, this data source has the parallelism one. - The map function is chained to that source, so it runs with parallelism one as well. - To run it with a higher parallelism, use "setParallelism(...)" on the mapFunction, or call "redistribute()" after the source. - The collect collects in parallel, but ultimately has a non-parallel code path, when results are concatenated and transferred to the client. BTW: The execution plan and the web frontend are helpful to diagnose such things. On Mon, Jun 29, 2015 at 4:14 PM, Juan Fumero <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |