Best way to process data in many files? (FLINK-BATCH)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Best way to process data in many files? (FLINK-BATCH)

Tim Conrad


Dear FLINK community.

I was wondering what would be the recommended (best?) way to achieve some kind of file conversion. That runs in parallel on all available Flink Nodes, since it it "embarrassingly parallel" (no dependency between files).

Say, I have a HDFS folder that contains multiple structured text-files containing (x,y) pairs (think of CVS).

For each of these files I want to do (individual for each file) the following:

* Read file from HDFS
* Extract dataset(s) from file (e.g. list of (x,y) pairs)
* Apply some filter (e.g. smoothing)
* Do some pattern recognition on smoothed data
* Write results back to HDFS (different format)

Would the following be a good idea?

DataSource<String> fileList = ... // contains list of file names in HDFS

// For each "filename" in list do...
DataSet<FeatureList> featureList = fileList
                .flatMap(new ReadDataSetFromFile()) // flatMap because there might multiple DataSets in a file
                .map(new Smoothing())
                .map(new FindPatterns());

featureList.writeAsFormattedText( ... )

I have the feeling that Flink does not distribute the independent tasks on the available nodes but executes everything on only one node.


Cheers
Tim



Reply | Threaded
Open this post in threaded view
|

Re: Best way to process data in many files? (FLINK-BATCH)

Till Rohrmann

Hi Tim,

depending on how you create the DataSource<String> fileList, Flink will schedule the downstream operators differently. If you used the ExecutionEnvironment.fromCollection method, then it will create a DataSource with a CollectionInputFormat. This kind of DataSource will only be executed with a degree of parallelism of 1. The source will send it’s collection elements in a round robin fashion to the downstream operators which are executed with a higher parallelism. So when Flink schedules the downstream operators, it will try to place them close to their inputs. Since all flat map operators have the single data source task as an input, they will be deployed on the same machine if possible.

In contrast, if you had a parallel data source which would consist of multiple source task, then these tasks would be independent and spread out across your cluster. In this case, every flat map task would have a single distinct source task as input. When the flat map tasks are deployed they would be deployed on the machine where their corresponding source is running. Since the source tasks are spread out across the cluster, the flat map tasks would be spread out as well.

What you could do to mitigate your problem is to start the cluster with as many slots as your maximum degree of parallelism is. That way, you’ll utilize all cluster resources.

I hope this clarifies a bit why you observe that tasks tend to cluster on a single machine.

Cheers,
Till


On Tue, Feb 23, 2016 at 1:49 PM, Tim Conrad <[hidden email]> wrote:


Dear FLINK community.

I was wondering what would be the recommended (best?) way to achieve some kind of file conversion. That runs in parallel on all available Flink Nodes, since it it "embarrassingly parallel" (no dependency between files).

Say, I have a HDFS folder that contains multiple structured text-files containing (x,y) pairs (think of CVS).

For each of these files I want to do (individual for each file) the following:

* Read file from HDFS
* Extract dataset(s) from file (e.g. list of (x,y) pairs)
* Apply some filter (e.g. smoothing)
* Do some pattern recognition on smoothed data
* Write results back to HDFS (different format)

Would the following be a good idea?

DataSource<String> fileList = ... // contains list of file names in HDFS

// For each "filename" in list do...
DataSet<FeatureList> featureList = fileList
                .flatMap(new ReadDataSetFromFile()) // flatMap because there might multiple DataSets in a file
                .map(new Smoothing())
                .map(new FindPatterns());

featureList.writeAsFormattedText( ... )

I have the feeling that Flink does not distribute the independent tasks on the available nodes but executes everything on only one node.


Cheers
Tim




Reply | Threaded
Open this post in threaded view
|

Re: Best way to process data in many files? (FLINK-BATCH)

Tim Conrad
Hi Till (and others).

Thank you very much for your helpful answer.

On 23.02.2016 14:20, Till Rohrmann wrote:
[...] In contrast, if you had a parallel data source which would consist of multiple source task, then these tasks would be independent and spread out across your cluster [...]

Can you please send me a link to an example or to the respective Flink API doc, where I can see which is a parallel data source and how to create it with multiple source tasks?

A simple Google search did not provide me with an answer (maybe I used the wrong key words, though...).


Cheers
Tim




On 23.02.2016 14:20, Till Rohrmann wrote:

Hi Tim,

depending on how you create the DataSource<String> fileList, Flink will schedule the downstream operators differently. If you used the ExecutionEnvironment.fromCollection method, then it will create a DataSource with a CollectionInputFormat. This kind of DataSource will only be executed with a degree of parallelism of 1. The source will send it’s collection elements in a round robin fashion to the downstream operators which are executed with a higher parallelism. So when Flink schedules the downstream operators, it will try to place them close to their inputs. Since all flat map operators have the single data source task as an input, they will be deployed on the same machine if possible.

In contrast, if you had a parallel data source which would consist of multiple source task, then these tasks would be independent and spread out across your cluster. In this case, every flat map task would have a single distinct source task as input. When the flat map tasks are deployed they would be deployed on the machine where their corresponding source is running. Since the source tasks are spread out across the cluster, the flat map tasks would be spread out as well.

What you could do to mitigate your problem is to start the cluster with as many slots as your maximum degree of parallelism is. That way, you’ll utilize all cluster resources.

I hope this clarifies a bit why you observe that tasks tend to cluster on a single machine.

Cheers,
Till



Reply | Threaded
Open this post in threaded view
|

Re: Best way to process data in many files? (FLINK-BATCH)

Till Rohrmann

Hi Tim,

unfortunately, this is not documented explicitly as far as I know. For the InputFormats there is a marker interface called NonParallelInput. The input formats which implement this interface will be executed with a parallelism of 1. At the moment this holds true for the CollectionInputFormat, IteratorInputFormat and the JDBCInputFormat.

I hope this helps.

Cheers,
Till


On Tue, Feb 23, 2016 at 3:44 PM, Tim Conrad <[hidden email]> wrote:
Hi Till (and others).

Thank you very much for your helpful answer.

On 23.02.2016 14:20, Till Rohrmann wrote:
[...] In contrast, if you had a parallel data source which would consist of multiple source task, then these tasks would be independent and spread out across your cluster [...]

Can you please send me a link to an example or to the respective Flink API doc, where I can see which is a parallel data source and how to create it with multiple source tasks?

A simple Google search did not provide me with an answer (maybe I used the wrong key words, though...).


Cheers
Tim





On 23.02.2016 14:20, Till Rohrmann wrote:

Hi Tim,

depending on how you create the DataSource<String> fileList, Flink will schedule the downstream operators differently. If you used the ExecutionEnvironment.fromCollection method, then it will create a DataSource with a CollectionInputFormat. This kind of DataSource will only be executed with a degree of parallelism of 1. The source will send it’s collection elements in a round robin fashion to the downstream operators which are executed with a higher parallelism. So when Flink schedules the downstream operators, it will try to place them close to their inputs. Since all flat map operators have the single data source task as an input, they will be deployed on the same machine if possible.

In contrast, if you had a parallel data source which would consist of multiple source task, then these tasks would be independent and spread out across your cluster. In this case, every flat map task would have a single distinct source task as input. When the flat map tasks are deployed they would be deployed on the machine where their corresponding source is running. Since the source tasks are spread out across the cluster, the flat map tasks would be spread out as well.

What you could do to mitigate your problem is to start the cluster with as many slots as your maximum degree of parallelism is. That way, you’ll utilize all cluster resources.

I hope this clarifies a bit why you observe that tasks tend to cluster on a single machine.

Cheers,
Till




Reply | Threaded
Open this post in threaded view
|

Re: Best way to process data in many files? (FLINK-BATCH)

Gábor Gévay
Hello,

> // For each "filename" in list do...
> DataSet<FeatureList> featureList = fileList
>                 .flatMap(new ReadDataSetFromFile()) // flatMap because there
> might multiple DataSets in a file

What happens if you just insert .rebalance() before the flatMap?

> This kind of DataSource will only be executed
> with a degree of parallelism of 1. The source will send it’s collection
> elements in a round robin fashion to the downstream operators which are
> executed with a higher parallelism. So when Flink schedules the downstream
> operators, it will try to place them close to their inputs. Since all flat
> map operators have the single data source task as an input, they will be
> deployed on the same machine if possible.

Sorry, I'm a little confused here. Do you mean that the flatMap will
have a high parallelism, but all instances on a single machine?
Because I tried to reproduce the situation where I have a non-parallel
data source and then a flatMap, and the plan shows that the flatMap
actually has parallelism 1, which would be an alternative explanation
to the original problem that it gets executed on a single machine.
Then, if I insert .rebalance() after the source, then a "Partition"
operation appears between the source and the flatMap, and the flatMap
has a high parallelism. I think this should also solve the problem,
without having to write a parallel data source.

Best,
Gábor
Reply | Threaded
Open this post in threaded view
|

Re: Best way to process data in many files? (FLINK-BATCH)

Till Rohrmann

If I’m not mistaken, then this shouldn’t solve the scheduling peculiarity of Flink. Flink will still deploy the tasks of the flat map operation to the machine where the source task is running. Only after this machine has no more slots left, other machines will be used as well.

I think that you don’t need an explicit rebalance() method here. Flink will automatically insert the PartitionMethod.REBALANCE strategy.

Cheers,
Till


On Wed, Feb 24, 2016 at 4:01 PM, Gábor Gévay <[hidden email]> wrote:
Hello,

> // For each "filename" in list do...
> DataSet<FeatureList> featureList = fileList
>                 .flatMap(new ReadDataSetFromFile()) // flatMap because there
> might multiple DataSets in a file

What happens if you just insert .rebalance() before the flatMap?

> This kind of DataSource will only be executed
> with a degree of parallelism of 1. The source will send it’s collection
> elements in a round robin fashion to the downstream operators which are
> executed with a higher parallelism. So when Flink schedules the downstream
> operators, it will try to place them close to their inputs. Since all flat
> map operators have the single data source task as an input, they will be
> deployed on the same machine if possible.

Sorry, I'm a little confused here. Do you mean that the flatMap will
have a high parallelism, but all instances on a single machine?
Because I tried to reproduce the situation where I have a non-parallel
data source and then a flatMap, and the plan shows that the flatMap
actually has parallelism 1, which would be an alternative explanation
to the original problem that it gets executed on a single machine.
Then, if I insert .rebalance() after the source, then a "Partition"
operation appears between the source and the flatMap, and the flatMap
has a high parallelism. I think this should also solve the problem,
without having to write a parallel data source.

Best,
Gábor

Reply | Threaded
Open this post in threaded view
|

Re: Best way to process data in many files? (FLINK-BATCH)

Tim Conrad
Dear Till and others.

I solved the issue by using the strategy suggested by Till like this:

        List<String> fileListOfSpectra = ...
        SplittableList<String> fileListOfSpectraSplitable = new SplittableList<String>( fileListOfSpectra );
        DataSource<String> fileListOfSpectraDataSource = env.fromParallelCollection( fileListOfSpectraSplitable, String.class );

and then - as before -

 DataSet<Peaklist> peakLists = fileListOfSpectraDataSource
                .flatMap(new ReadDataFromFile())
...

(Find the source for the class "SplittableList" below). Now FLINK distributes the tasks to all available FLINK nodes.

Thanks for the help!

Cheers
Tim



On 24.02.2016 16:30, Till Rohrmann wrote:

If I’m not mistaken, then this shouldn’t solve the scheduling peculiarity of Flink. Flink will still deploy the tasks of the flat map operation to the machine where the source task is running. Only after this machine has no more slots left, other machines will be used as well.

I think that you don’t need an explicit rebalance() method here. Flink will automatically insert the PartitionMethod.REBALANCE strategy.

Cheers,
Till


import org.apache.flink.util.SplittableIterator;

import java.util.Iterator;
import java.util.List;

public class SplittableList<T> extends SplittableIterator<T> {

    private List<T> list;
    private int cursor;

    public SplittableList(List<T> list) {
        this.cursor = 0;
        this.list = list;
    }


    @Override
    public Iterator<T>[] split(int numPartitions) {

        if (numPartitions < 1) {
            throw new IllegalArgumentException("The number of partitions must be at least 1.");
        }

        Iterator<T>[] iters = new Iterator[numPartitions];

        if (numPartitions == 1) {
            iters[0] = new SplittableList(list);
            return iters;
        }

        int partSize = (int) Math.floor((double) list.size() / numPartitions);
        for (int i = 0; i < (numPartitions - 1); i++) {
            List<T> subFileList = list.subList(i * partSize, (i + 1) * partSize);
            iters[i] = new SplittableList(subFileList);
        }

        List<T> subFileList = list.subList((numPartitions - 1) * partSize, list.size());
        iters[numPartitions - 1] = new SplittableList(subFileList);

        return iters;

    }

    @Override
    public int getMaximumNumberOfSplits() {
        return list.size();
    }

    public boolean hasNext() {
        return (cursor < list.size());
    }

    public T next() {
        T item = list.get(cursor);
        cursor++;
        return item;
    }

    public void remove() {
        throw new IllegalArgumentException("Remove not implemented yet.");
    }

}