Parallelism question

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallelism question

Giacomo Licari
Hi guys,
I have a question about how parallelism works.

If I have a large dataset and I would divide it into 5 blocks, can I pass each block of data to a fixed parallel process (for example I set up 5 process) ? 

And if the results data from each process arrive to the output not in an ordered way, can I order them? For example:

data from process 1
data from process 2
and so on

Thank you guys!
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism question

Maximilian Michels
Hi Giacomo,

If I understand you correctly, you want your Flink job to execute with a parallelism of 5. Just call setDegreeOfParallelism(5) on your ExecutionEnvironment. That way, all operations, when possible, will be performed using 5 parallel instances. This is also true for the DataSink which will produce 5 files containing the output data from the parallel instances.

Best,
Max


On Tue, Apr 14, 2015 at 10:38 AM, Giacomo Licari <[hidden email]> wrote:
Hi guys,
I have a question about how parallelism works.

If I have a large dataset and I would divide it into 5 blocks, can I pass each block of data to a fixed parallel process (for example I set up 5 process) ? 

And if the results data from each process arrive to the output not in an ordered way, can I order them? For example:

data from process 1
data from process 2
and so on

Thank you guys!

Reply | Threaded
Open this post in threaded view
|

Re: Parallelism question

Giacomo Licari
Hi Max,
thank you for your reply.

DataSink contains data ordered, I mean, it contains in order output1, output1 ... output5? Or are them mixed?

Thanks a lot,
Giacomo

On Tue, Apr 14, 2015 at 11:58 AM, Maximilian Michels <[hidden email]> wrote:
Hi Giacomo,

If I understand you correctly, you want your Flink job to execute with a parallelism of 5. Just call setDegreeOfParallelism(5) on your ExecutionEnvironment. That way, all operations, when possible, will be performed using 5 parallel instances. This is also true for the DataSink which will produce 5 files containing the output data from the parallel instances.

Best,
Max


On Tue, Apr 14, 2015 at 10:38 AM, Giacomo Licari <[hidden email]> wrote:
Hi guys,
I have a question about how parallelism works.

If I have a large dataset and I would divide it into 5 blocks, can I pass each block of data to a fixed parallel process (for example I set up 5 process) ? 

And if the results data from each process arrive to the output not in an ordered way, can I order them? For example:

data from process 1
data from process 2
and so on

Thank you guys!


Reply | Threaded
Open this post in threaded view
|

Re: Parallelism question

Maximilian Michels
Hi Giacomo,

If you use a FileOutputFormat as a DataSink (e.g. as in env.writeAsText("/path"), then the output directory will contain 5 files named 1, 2, 3, 4, and 5, each containing the output of the corresponding task. The order of the data in the files follows the order of the distributed DataSet. You can locally sort a partition by a key using sortPartition(..) command. This is only available in 0.9.0-milestone-1 and 0.9-snapshot.

Best,
Max



On Tue, Apr 14, 2015 at 12:12 PM, Giacomo Licari <[hidden email]> wrote:
Hi Max,
thank you for your reply.

DataSink contains data ordered, I mean, it contains in order output1, output1 ... output5? Or are them mixed?

Thanks a lot,
Giacomo

On Tue, Apr 14, 2015 at 11:58 AM, Maximilian Michels <[hidden email]> wrote:
Hi Giacomo,

If I understand you correctly, you want your Flink job to execute with a parallelism of 5. Just call setDegreeOfParallelism(5) on your ExecutionEnvironment. That way, all operations, when possible, will be performed using 5 parallel instances. This is also true for the DataSink which will produce 5 files containing the output data from the parallel instances.

Best,
Max


On Tue, Apr 14, 2015 at 10:38 AM, Giacomo Licari <[hidden email]> wrote:
Hi guys,
I have a question about how parallelism works.

If I have a large dataset and I would divide it into 5 blocks, can I pass each block of data to a fixed parallel process (for example I set up 5 process) ? 

And if the results data from each process arrive to the output not in an ordered way, can I order them? For example:

data from process 1
data from process 2
and so on

Thank you guys!



Reply | Threaded
Open this post in threaded view
|

Re: Parallelism question

Fabian Hueske-2
Hi Giacomo,

as Max said, you can sort the data within a partition.

However, data across partitions is not sorted. It is either random partitioned or hash-partitioned (all records that share some property are in the same partition). Producing fully ordered output, where the first partition has all values 1-10, the second 11-20, and so on, requires range partitioning which is not yet supported by Flink.

2015-04-14 6:22 GMT-05:00 Maximilian Michels <[hidden email]>:
Hi Giacomo,

If you use a FileOutputFormat as a DataSink (e.g. as in env.writeAsText("/path"), then the output directory will contain 5 files named 1, 2, 3, 4, and 5, each containing the output of the corresponding task. The order of the data in the files follows the order of the distributed DataSet. You can locally sort a partition by a key using sortPartition(..) command. This is only available in 0.9.0-milestone-1 and 0.9-snapshot.

Best,
Max



On Tue, Apr 14, 2015 at 12:12 PM, Giacomo Licari <[hidden email]> wrote:
Hi Max,
thank you for your reply.

DataSink contains data ordered, I mean, it contains in order output1, output1 ... output5? Or are them mixed?

Thanks a lot,
Giacomo

On Tue, Apr 14, 2015 at 11:58 AM, Maximilian Michels <[hidden email]> wrote:
Hi Giacomo,

If I understand you correctly, you want your Flink job to execute with a parallelism of 5. Just call setDegreeOfParallelism(5) on your ExecutionEnvironment. That way, all operations, when possible, will be performed using 5 parallel instances. This is also true for the DataSink which will produce 5 files containing the output data from the parallel instances.

Best,
Max


On Tue, Apr 14, 2015 at 10:38 AM, Giacomo Licari <[hidden email]> wrote:
Hi guys,
I have a question about how parallelism works.

If I have a large dataset and I would divide it into 5 blocks, can I pass each block of data to a fixed parallel process (for example I set up 5 process) ? 

And if the results data from each process arrive to the output not in an ordered way, can I order them? For example:

data from process 1
data from process 2
and so on

Thank you guys!




Reply | Threaded
Open this post in threaded view
|

Re: Parallelism question

Giacomo Licari
Hi Fabian,
thanks for your reply, my question was exactly about that problem, range partitioning.

As I have to process a large dataset of values, and to apply a datamining algorythm on each partition, for me an important point is that the final result is ordered, to do not lose the sense of data.

On Thu, Apr 16, 2015 at 3:35 PM, Fabian Hueske <[hidden email]> wrote:
Hi Giacomo,

as Max said, you can sort the data within a partition.

However, data across partitions is not sorted. It is either random partitioned or hash-partitioned (all records that share some property are in the same partition). Producing fully ordered output, where the first partition has all values 1-10, the second 11-20, and so on, requires range partitioning which is not yet supported by Flink.

2015-04-14 6:22 GMT-05:00 Maximilian Michels <[hidden email]>:

Hi Giacomo,

If you use a FileOutputFormat as a DataSink (e.g. as in env.writeAsText("/path"), then the output directory will contain 5 files named 1, 2, 3, 4, and 5, each containing the output of the corresponding task. The order of the data in the files follows the order of the distributed DataSet. You can locally sort a partition by a key using sortPartition(..) command. This is only available in 0.9.0-milestone-1 and 0.9-snapshot.

Best,
Max



On Tue, Apr 14, 2015 at 12:12 PM, Giacomo Licari <[hidden email]> wrote:
Hi Max,
thank you for your reply.

DataSink contains data ordered, I mean, it contains in order output1, output1 ... output5? Or are them mixed?

Thanks a lot,
Giacomo

On Tue, Apr 14, 2015 at 11:58 AM, Maximilian Michels <[hidden email]> wrote:
Hi Giacomo,

If I understand you correctly, you want your Flink job to execute with a parallelism of 5. Just call setDegreeOfParallelism(5) on your ExecutionEnvironment. That way, all operations, when possible, will be performed using 5 parallel instances. This is also true for the DataSink which will produce 5 files containing the output data from the parallel instances.

Best,
Max


On Tue, Apr 14, 2015 at 10:38 AM, Giacomo Licari <[hidden email]> wrote:
Hi guys,
I have a question about how parallelism works.

If I have a large dataset and I would divide it into 5 blocks, can I pass each block of data to a fixed parallel process (for example I set up 5 process) ? 

And if the results data from each process arrive to the output not in an ordered way, can I order them? For example:

data from process 1
data from process 2
and so on

Thank you guys!





Reply | Threaded
Open this post in threaded view
|

Re: Parallelism question

Fabian Hueske-2
You could try to work around this using a custom Partioner [1].

myData.partitionCustom(new MyPartitioner(), "myPartitionField").sortPartition("myPartitionField").writeToCsv(...);

In that case, you need to implement the Partition function yourself. To do that "right" you need to know the value distribution of your partition key.
Unfortunately, custom Partitioners are not included in the documentation yet.


2015-04-17 2:45 GMT-05:00 Giacomo Licari <[hidden email]>:
Hi Fabian,
thanks for your reply, my question was exactly about that problem, range partitioning.

As I have to process a large dataset of values, and to apply a datamining algorythm on each partition, for me an important point is that the final result is ordered, to do not lose the sense of data.

On Thu, Apr 16, 2015 at 3:35 PM, Fabian Hueske <[hidden email]> wrote:
Hi Giacomo,

as Max said, you can sort the data within a partition.

However, data across partitions is not sorted. It is either random partitioned or hash-partitioned (all records that share some property are in the same partition). Producing fully ordered output, where the first partition has all values 1-10, the second 11-20, and so on, requires range partitioning which is not yet supported by Flink.

2015-04-14 6:22 GMT-05:00 Maximilian Michels <[hidden email]>:

Hi Giacomo,

If you use a FileOutputFormat as a DataSink (e.g. as in env.writeAsText("/path"), then the output directory will contain 5 files named 1, 2, 3, 4, and 5, each containing the output of the corresponding task. The order of the data in the files follows the order of the distributed DataSet. You can locally sort a partition by a key using sortPartition(..) command. This is only available in 0.9.0-milestone-1 and 0.9-snapshot.

Best,
Max



On Tue, Apr 14, 2015 at 12:12 PM, Giacomo Licari <[hidden email]> wrote:
Hi Max,
thank you for your reply.

DataSink contains data ordered, I mean, it contains in order output1, output1 ... output5? Or are them mixed?

Thanks a lot,
Giacomo

On Tue, Apr 14, 2015 at 11:58 AM, Maximilian Michels <[hidden email]> wrote:
Hi Giacomo,

If I understand you correctly, you want your Flink job to execute with a parallelism of 5. Just call setDegreeOfParallelism(5) on your ExecutionEnvironment. That way, all operations, when possible, will be performed using 5 parallel instances. This is also true for the DataSink which will produce 5 files containing the output data from the parallel instances.

Best,
Max


On Tue, Apr 14, 2015 at 10:38 AM, Giacomo Licari <[hidden email]> wrote:
Hi guys,
I have a question about how parallelism works.

If I have a large dataset and I would divide it into 5 blocks, can I pass each block of data to a fixed parallel process (for example I set up 5 process) ? 

And if the results data from each process arrive to the output not in an ordered way, can I order them? For example:

data from process 1
data from process 2
and so on

Thank you guys!