How to create a stream of data batches

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to create a stream of data batches

Andres R. Masegosa
Hi,

I'm trying to code some machine learning algorithms on top of flink such
as a variational Bayes learning algorithms. Instead of working at a data
element level (i.e. using map transformations), it would be far more
efficient to work at a "batch of elements" levels (i.e. I get a batch of
elements and I produce some output).

I could code that using "mapPartition" function. But I can not control
the size of the partition, isn't?

Is there any way to transform a stream (or DataSet) of elements in a
stream (or DataSet) of data batches with the same size?


Thanks for your support,
Andres
Reply | Threaded
Open this post in threaded view
|

Re: How to create a stream of data batches

Matthias J. Sax-2
Hi Andres,

you could do this by using your own data type, for example

> public class MyBatch {
>   private ArrayList<MyTupleType> data = new ArrayList<MyTupleType>
> }

In the DataSource, you need to specify your own InputFormat that reads
multiple tuples into a batch and emits the whole batch at once.

However, be aware, that this POJO type hides the batch nature from
Flink, ie, Flink does not know anything about the tuples in the batch.
To Flink a batch is a single tuple. If you want to perform key-based
operations, this might become a problem.

-Matthias

On 09/04/2015 01:00 PM, Andres R. Masegosa  wrote:

> Hi,
>
> I'm trying to code some machine learning algorithms on top of flink such
> as a variational Bayes learning algorithms. Instead of working at a data
> element level (i.e. using map transformations), it would be far more
> efficient to work at a "batch of elements" levels (i.e. I get a batch of
> elements and I produce some output).
>
> I could code that using "mapPartition" function. But I can not control
> the size of the partition, isn't?
>
> Is there any way to transform a stream (or DataSet) of elements in a
> stream (or DataSet) of data batches with the same size?
>
>
> Thanks for your support,
> Andres
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to create a stream of data batches

Stephan Ewen
Interesting question, you are the second to ask that.

Batching in user code is a way, as Matthias said. We have on the roadmap a way to transform a stream to a set of batches, but it will be a bit until this is in. See https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

What kind of operation do you want to do on the batch? Will the batched communicate (repartition, group, join), or will they only work partition-local?

On Fri, Sep 4, 2015 at 1:12 PM, Matthias J. Sax <[hidden email]> wrote:
Hi Andres,

you could do this by using your own data type, for example

> public class MyBatch {
>   private ArrayList<MyTupleType> data = new ArrayList<MyTupleType>
> }

In the DataSource, you need to specify your own InputFormat that reads
multiple tuples into a batch and emits the whole batch at once.

However, be aware, that this POJO type hides the batch nature from
Flink, ie, Flink does not know anything about the tuples in the batch.
To Flink a batch is a single tuple. If you want to perform key-based
operations, this might become a problem.

-Matthias

On 09/04/2015 01:00 PM, Andres R. Masegosa  wrote:
> Hi,
>
> I'm trying to code some machine learning algorithms on top of flink such
> as a variational Bayes learning algorithms. Instead of working at a data
> element level (i.e. using map transformations), it would be far more
> efficient to work at a "batch of elements" levels (i.e. I get a batch of
> elements and I produce some output).
>
> I could code that using "mapPartition" function. But I can not control
> the size of the partition, isn't?
>
> Is there any way to transform a stream (or DataSet) of elements in a
> stream (or DataSet) of data batches with the same size?
>
>
> Thanks for your support,
> Andres
>


Reply | Threaded
Open this post in threaded view
|

Re: How to create a stream of data batches

Juan Rodríguez Hortalá
Hi, 

I'm just a Flink newbie, but maybe I'd suggest using window operators with a Count policy for that


Hope that helps. 

Greetings, 

Juan 


2015-09-04 14:14 GMT+02:00 Stephan Ewen <[hidden email]>:
Interesting question, you are the second to ask that.

Batching in user code is a way, as Matthias said. We have on the roadmap a way to transform a stream to a set of batches, but it will be a bit until this is in. See https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

What kind of operation do you want to do on the batch? Will the batched communicate (repartition, group, join), or will they only work partition-local?

On Fri, Sep 4, 2015 at 1:12 PM, Matthias J. Sax <[hidden email]> wrote:
Hi Andres,

you could do this by using your own data type, for example

> public class MyBatch {
>   private ArrayList<MyTupleType> data = new ArrayList<MyTupleType>
> }

In the DataSource, you need to specify your own InputFormat that reads
multiple tuples into a batch and emits the whole batch at once.

However, be aware, that this POJO type hides the batch nature from
Flink, ie, Flink does not know anything about the tuples in the batch.
To Flink a batch is a single tuple. If you want to perform key-based
operations, this might become a problem.

-Matthias

On 09/04/2015 01:00 PM, Andres R. Masegosa  wrote:
> Hi,
>
> I'm trying to code some machine learning algorithms on top of flink such
> as a variational Bayes learning algorithms. Instead of working at a data
> element level (i.e. using map transformations), it would be far more
> efficient to work at a "batch of elements" levels (i.e. I get a batch of
> elements and I produce some output).
>
> I could code that using "mapPartition" function. But I can not control
> the size of the partition, isn't?
>
> Is there any way to transform a stream (or DataSet) of elements in a
> stream (or DataSet) of data batches with the same size?
>
>
> Thanks for your support,
> Andres
>