Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view

Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

Kostas Tzoumas
Hi everyone,

I'm forwarding a private conversation to the list with Mats' approval.

The problem is how to compute correlation between time series in Flink. We have two time series, U and V, and need to compute 1000 correlation measures between the series, each measure shifts one series by one more item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.

Any ideas on how one can do that without a Cartesian product?


---------- Forwarded message ----------
From: Mats Zachrison <[hidden email]>
Date: Tue, Mar 31, 2015 at 9:21 AM
To: Kostas Tzoumas <[hidden email]>, Stefan Avesand <[hidden email]>
Cc: "[hidden email]" <[hidden email]>

As Stefan said, what I’m trying to achieve is basically a nice way to do a correlation between two large time series. Since I’m looking for an optimal delay between the two series, I’d like to delay one of the series x observations when doing the correlation, and step x from 1 to 1000.


Some pseudo code:


  For (x = 1 to 1000)

      Shift Series A ‘x-1’ steps

      Correlation[x] = Correlate(Series A and Series B)

  End For


In R, using cor() and apply(), this could look like:


  shift <- as.array(c(1:1000))

  corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data), ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))



Since this basically is 1000 independent correlation calculations, it is fairly easy to parallelize. Here is an R example using foreach() and package doParallel:


  cl <- makeCluster(3)


  corrAB <- foreach(step = c(1:1000)) %dopar% {

        corrAB <- cor(data[step:nrow(data), ]$ColumnA, data[1:(nrow(data) - (step - 1)), ]$ColumnB)




So I guess the question is – how to do this in a Flink environment? Do we have to define how to parallelize the algorithm, or can the cluster take care of that for us?


And of course this is most interesting on a generic level – given the environment of a multi-core or –processor setup running Flink, how hard is it to take advantage of all the clock cycles? Do we have to split the algorithm, and data, and distribute the processing, or can the system do much of that for us?





Reply | Threaded
Open this post in threaded view

Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

Sebastian Schelter-2
How large are the individual time series?


On 07.04.2015 12:42, Kostas Tzoumas wrote:

> Hi everyone,
> I'm forwarding a private conversation to the list with Mats' approval.
> The problem is how to compute correlation between time series in Flink.
> We have two time series, U and V, and need to compute 1000 correlation
> measures between the series, each measure shifts one series by one more
> item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.
> Any ideas on how one can do that without a Cartesian product?
> Best,
> Kostas
> ---------- Forwarded message ----------
> From: *Mats Zachrison* <[hidden email]
> <mailto:[hidden email]>>
> Date: Tue, Mar 31, 2015 at 9:21 AM
> Subject:
> To: Kostas Tzoumas <[hidden email]
> <mailto:[hidden email]>>, Stefan Avesand
> <[hidden email] <mailto:[hidden email]>>
> Cc: "[hidden email] <mailto:[hidden email]>"
> <[hidden email] <mailto:[hidden email]>>
> As Stefan said, what I’m trying to achieve is basically a nice way to do
> a correlation between two large time series. Since I’m looking for an
> optimal delay between the two series, I’d like to delay one of the
> series x observations when doing the correlation, and step x from 1 to
> 1000.____
> __ __
> Some pseudo code:____
> __ __
>    For (x = 1 to 1000)____
>        Shift Series A ‘x-1’ steps____
>        Correlation[x] = Correlate(Series A and Series B)____
>    End For____
> __ __
> In R, using cor() and apply(), this could look like:____
> __ __
>    shift <- as.array(c(1:1000))____
>    corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data),
> ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____
> __ __
> __ __
> Since this basically is 1000 independent correlation calculations, it is
> fairly easy to parallelize. Here is an R example using foreach() and
> package doParallel:____
> __ __
>    cl <- makeCluster(3)____
>    registerDoParallel(cl)____
>    corrAB <- foreach(step = c(1:1000)) %dopar% {____
>          corrAB <- cor(data[step:nrow(data), ]$ColumnA,
> data[1:(nrow(data) - (step - 1)), ]$ColumnB)____
>    }____
>    stopCluster(cl)____
> __ __
> So I guess the question is – how to do this in a Flink environment? Do
> we have to define how to parallelize the algorithm, or can the cluster
> take care of that for us?____
> __ __
> And of course this is most interesting on a generic level – given the
> environment of a multi-core or –processor setup running Flink, how hard
> is it to take advantage of all the clock cycles? Do we have to split the
> algorithm, and data, and distribute the processing, or can the system do
> much of that for us?____
> __
> __ __
> __
Reply | Threaded
Open this post in threaded view

Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

Till Rohrmann
I don't know whether my ideas are much better than the cartesian product solution. As a matter of fact at some point we have to replicate the data to be able to compute the correlations in parallel. There are basically 3 ideas I had:

1. Broadcast U and V and simply compute the correlation for different shifts in a mapper. This only works if the time series data is small enough to be kept in memory of a task manager.
2. Create for each shift and element a join key, join the elements and reduce them to obtain the final result. This has a communication complexity of (n^2+n)/2 which is asymptotically the same as the cartesian product solution. But this solution will probably run for arbitrarily large correlation intervals.

So let's say we have (u1, u2, u3) and (v1, v2, v3): Then we would first create the join keys: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2), (2, 2, u2), (1, 3, u3), (1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3), (2, 2, v3), (3, 1, v3). Then join on the first and second field and compute u*v with the first field as key. Reducing on this field let's you then compute the correlation.

3. Group the elements of each subinterval with respect to their shift value and join both grouped subintervals. Then compute the correlation. This again only works if the grouped data can be kept on the heap of the task manager.

On Tue, Apr 7, 2015 at 1:29 PM, Sebastian <[hidden email]> wrote:
How large are the individual time series?


On 07.04.2015 12:42, Kostas Tzoumas wrote:
Hi everyone,

I'm forwarding a private conversation to the list with Mats' approval.

The problem is how to compute correlation between time series in Flink.
We have two time series, U and V, and need to compute 1000 correlation
measures between the series, each measure shifts one series by one more
item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.

Any ideas on how one can do that without a Cartesian product?


---------- Forwarded message ----------
From: *Mats Zachrison* <[hidden email]
<mailto:[hidden email]>>
Date: Tue, Mar 31, 2015 at 9:21 AM
To: Kostas Tzoumas <[hidden email]
<mailto:[hidden email]>>, Stefan Avesand
<[hidden email] <mailto:[hidden email]>>
Cc: "[hidden email] <mailto:[hidden email]>"
<[hidden email] <mailto:[hidden email]>>

As Stefan said, what I’m trying to achieve is basically a nice way to do
a correlation between two large time series. Since I’m looking for an
optimal delay between the two series, I’d like to delay one of the
series x observations when doing the correlation, and step x from 1 to

__ __

Some pseudo code:____

__ __

   For (x = 1 to 1000)____

       Shift Series A ‘x-1’ steps____

       Correlation[x] = Correlate(Series A and Series B)____

   End For____

__ __

In R, using cor() and apply(), this could look like:____

__ __

   shift <- as.array(c(1:1000))____

   corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data),
]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____

__ __

__ __

Since this basically is 1000 independent correlation calculations, it is
fairly easy to parallelize. Here is an R example using foreach() and
package doParallel:____

__ __

   cl <- makeCluster(3)____


   corrAB <- foreach(step = c(1:1000)) %dopar% {____

         corrAB <- cor(data[step:nrow(data), ]$ColumnA,
data[1:(nrow(data) - (step - 1)), ]$ColumnB)____



__ __

So I guess the question is – how to do this in a Flink environment? Do
we have to define how to parallelize the algorithm, or can the cluster
take care of that for us?____

__ __

And of course this is most interesting on a generic level – given the
environment of a multi-core or –processor setup running Flink, how hard
is it to take advantage of all the clock cycles? Do we have to split the
algorithm, and data, and distribute the processing, or can the system do
much of that for us?____


__ __


Reply | Threaded
Open this post in threaded view

Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

Sebastian Schelter-2
For some similarity/correlation measures, it is also possible to discard
candidate pairs early, if a threshold for the resulting correlation is
given. This could help to fight the quadratic nature of the problem.
Looking for papers on similarity search might help.


On 07.04.2015 15:19, Till Rohrmann wrote:

> I don't know whether my ideas are much better than the cartesian product
> solution. As a matter of fact at some point we have to replicate the
> data to be able to compute the correlations in parallel. There are
> basically 3 ideas I had:
> 1. Broadcast U and V and simply compute the correlation for different
> shifts in a mapper. This only works if the time series data is small
> enough to be kept in memory of a task manager.
> 2. Create for each shift and element a join key, join the elements and
> reduce them to obtain the final result. This has a communication
> complexity of (n^2+n)/2 which is asymptotically the same as the
> cartesian product solution. But this solution will probably run for
> arbitrarily large correlation intervals.
> So let's say we have (u1, u2, u3) and (v1, v2, v3): Then we would first
> create the join keys: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2),
> (2, 2, u2), (1, 3, u3), (1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3),
> (2, 2, v3), (3, 1, v3). Then join on the first and second field and
> compute u*v with the first field as key. Reducing on this field let's
> you then compute the correlation.
> 3. Group the elements of each subinterval with respect to their shift
> value and join both grouped subintervals. Then compute the correlation.
> This again only works if the grouped data can be kept on the heap of the
> task manager.
> On Tue, Apr 7, 2015 at 1:29 PM, Sebastian <[hidden email]
> <mailto:[hidden email]>> wrote:
>     How large are the individual time series?
>     -s
>     On 07.04.2015 12:42, Kostas Tzoumas wrote:
>         Hi everyone,
>         I'm forwarding a private conversation to the list with Mats'
>         approval.
>         The problem is how to compute correlation between time series in
>         Flink.
>         We have two time series, U and V, and need to compute 1000
>         correlation
>         measures between the series, each measure shifts one series by
>         one more
>         item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.
>         Any ideas on how one can do that without a Cartesian product?
>         Best,
>         Kostas
>         ---------- Forwarded message ----------
>         From: *Mats Zachrison* <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:[hidden email]
>         <mailto:[hidden email]>>>
>         Date: Tue, Mar 31, 2015 at 9:21 AM
>         Subject:
>         To: Kostas Tzoumas <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:kostas@data-artisans.__com
>         <mailto:[hidden email]>>>, Stefan Avesand
>         <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:[hidden email]
>         <mailto:[hidden email]>>>
>         Cc: "[hidden email]
>         <mailto:[hidden email]>
>         <mailto:stephan@data-artisans.__com
>         <mailto:[hidden email]>>"
>         <[hidden email] <mailto:[hidden email]>
>         <mailto:stephan@data-artisans.__com
>         <mailto:[hidden email]>>>
>         As Stefan said, what I’m trying to achieve is basically a nice
>         way to do
>         a correlation between two large time series. Since I’m looking
>         for an
>         optimal delay between the two series, I’d like to delay one of the
>         series x observations when doing the correlation, and step x
>         from 1 to
>         1000.____
>         __ __
>         Some pseudo code:____
>         __ __
>             For (x = 1 to 1000)____
>                 Shift Series A ‘x-1’ steps____
>                 Correlation[x] = Correlate(Series A and Series B)____
>             End For____
>         __ __
>         In R, using cor() and apply(), this could look like:____
>         __ __
>             shift <- as.array(c(1:1000))____
>             corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data),
>         ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____
>         __ __
>         __ __
>         Since this basically is 1000 independent correlation
>         calculations, it is
>         fairly easy to parallelize. Here is an R example using foreach() and
>         package doParallel:____
>         __ __
>             cl <- makeCluster(3)____
>             registerDoParallel(cl)____
>             corrAB <- foreach(step = c(1:1000)) %dopar% {____
>                   corrAB <- cor(data[step:nrow(data), ]$ColumnA,
>         data[1:(nrow(data) - (step - 1)), ]$ColumnB)____
>             }____
>             stopCluster(cl)____
>         __ __
>         So I guess the question is – how to do this in a Flink
>         environment? Do
>         we have to define how to parallelize the algorithm, or can the
>         cluster
>         take care of that for us?____
>         __ __
>         And of course this is most interesting on a generic level –
>         given the
>         environment of a multi-core or –processor setup running Flink,
>         how hard
>         is it to take advantage of all the clock cycles? Do we have to
>         split the
>         algorithm, and data, and distribute the processing, or can the
>         system do
>         much of that for us?____
>         __
>         __ __
>         __
Reply | Threaded
Open this post in threaded view

Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

Thanks for the input! The data is some NW data, measured for about 1.5 hours and aggregated into msec, i e 5.4 M rows. Sorry, should have included that information to begin with.
 B r /Mats

Sebastian Schelter-2 wrote
For some similarity/correlation measures, it is also possible to discard
candidate pairs early, if a threshold for the resulting correlation is
given. This could help to fight the quadratic nature of the problem.
Looking for papers on similarity search might help.


On 07.04.2015 15:19, Till Rohrmann wrote:
> I don't know whether my ideas are much better than the cartesian product
> solution. As a matter of fact at some point we have to replicate the
> data to be able to compute the correlations in parallel. There are
> basically 3 ideas I had:
> 1. Broadcast U and V and simply compute the correlation for different
> shifts in a mapper. This only works if the time series data is small
> enough to be kept in memory of a task manager.
> 2. Create for each shift and element a join key, join the elements and
> reduce them to obtain the final result. This has a communication
> complexity of (n^2+n)/2 which is asymptotically the same as the
> cartesian product solution. But this solution will probably run for
> arbitrarily large correlation intervals.
> So let's say we have (u1, u2, u3) and (v1, v2, v3): Then we would first
> create the join keys: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2),
> (2, 2, u2), (1, 3, u3), (1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3),
> (2, 2, v3), (3, 1, v3). Then join on the first and second field and
> compute u*v with the first field as key. Reducing on this field let's
> you then compute the correlation.
> 3. Group the elements of each subinterval with respect to their shift
> value and join both grouped subintervals. Then compute the correlation.
> This again only works if the grouped data can be kept on the heap of the
> task manager.
> On Tue, Apr 7, 2015 at 1:29 PM, Sebastian <[hidden email]
> <mailto:[hidden email]>> wrote:
>     How large are the individual time series?
>     -s
>     On 07.04.2015 12:42, Kostas Tzoumas wrote:
>         Hi everyone,
>         I'm forwarding a private conversation to the list with Mats'
>         approval.
>         The problem is how to compute correlation between time series in
>         Flink.
>         We have two time series, U and V, and need to compute 1000
>         correlation
>         measures between the series, each measure shifts one series by
>         one more
>         item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.
>         Any ideas on how one can do that without a Cartesian product?
>         Best,
>         Kostas
>         ---------- Forwarded message ----------
>         From: *Mats Zachrison* <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:[hidden email]
>         <mailto:[hidden email]>>>
>         Date: Tue, Mar 31, 2015 at 9:21 AM
>         Subject:
>         To: Kostas Tzoumas <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:kostas@data-artisans.__com
>         <mailto:[hidden email]>>>, Stefan Avesand
>         <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:[hidden email]
>         <mailto:[hidden email]>>>
>         Cc: "[hidden email]
>         <mailto:[hidden email]>
>         <mailto:stephan@data-artisans.__com
>         <mailto:[hidden email]>>"
>         <[hidden email] <mailto:[hidden email]>
>         <mailto:stephan@data-artisans.__com
>         <mailto:[hidden email]>>>
>         As Stefan said, what I’m trying to achieve is basically a nice
>         way to do
>         a correlation between two large time series. Since I’m looking
>         for an
>         optimal delay between the two series, I’d like to delay one of the
>         series x observations when doing the correlation, and step x
>         from 1 to
>         1000.____
>         __ __
>         Some pseudo code:____
>         __ __
>             For (x = 1 to 1000)____
>                 Shift Series A ‘x-1’ steps____
>                 Correlation[x] = Correlate(Series A and Series B)____
>             End For____
>         __ __
>         In R, using cor() and apply(), this could look like:____
>         __ __
>             shift <- as.array(c(1:1000))____
>             corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data),
>         ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____
>         __ __
>         __ __
>         Since this basically is 1000 independent correlation
>         calculations, it is
>         fairly easy to parallelize. Here is an R example using foreach() and
>         package doParallel:____
>         __ __
>             cl <- makeCluster(3)____
>             registerDoParallel(cl)____
>             corrAB <- foreach(step = c(1:1000)) %dopar% {____
>                   corrAB <- cor(data[step:nrow(data), ]$ColumnA,
>         data[1:(nrow(data) - (step - 1)), ]$ColumnB)____
>             }____
>             stopCluster(cl)____
>         __ __
>         So I guess the question is – how to do this in a Flink
>         environment? Do
>         we have to define how to parallelize the algorithm, or can the
>         cluster
>         take care of that for us?____
>         __ __
>         And of course this is most interesting on a generic level –
>         given the
>         environment of a multi-core or –processor setup running Flink,
>         how hard
>         is it to take advantage of all the clock cycles? Do we have to
>         split the
>         algorithm, and data, and distribute the processing, or can the
>         system do
>         much of that for us?____
>         __
>         __ __
>         __
Reply | Threaded
Open this post in threaded view

Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

Till Rohrmann-2
What is NW data? And each row is basically a double?



On Thu, Apr 9, 2015 at 12:51 PM, MatsZ <[hidden email]> wrote:
Thanks for the input! The data is some NW data, measured for about 1.5 hours
and aggregated into msec, i e 5.4 M rows. Sorry, should have included that
information to begin with.
 B r /Mats

Sebastian Schelter-2 wrote
> For some similarity/correlation measures, it is also possible to discard
> candidate pairs early, if a threshold for the resulting correlation is
> given. This could help to fight the quadratic nature of the problem.
> Looking for papers on similarity search might help.
> -s
> On 07.04.2015 15:19, Till Rohrmann wrote:
>> I don't know whether my ideas are much better than the cartesian product
>> solution. As a matter of fact at some point we have to replicate the
>> data to be able to compute the correlations in parallel. There are
>> basically 3 ideas I had:
>> 1. Broadcast U and V and simply compute the correlation for different
>> shifts in a mapper. This only works if the time series data is small
>> enough to be kept in memory of a task manager.
>> 2. Create for each shift and element a join key, join the elements and
>> reduce them to obtain the final result. This has a communication
>> complexity of (n^2+n)/2 which is asymptotically the same as the
>> cartesian product solution. But this solution will probably run for
>> arbitrarily large correlation intervals.
>> So let's say we have (u1, u2, u3) and (v1, v2, v3): Then we would first
>> create the join keys: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2),
>> (2, 2, u2), (1, 3, u3), (1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3),
>> (2, 2, v3), (3, 1, v3). Then join on the first and second field and
>> compute u*v with the first field as key. Reducing on this field let's
>> you then compute the correlation.
>> 3. Group the elements of each subinterval with respect to their shift
>> value and join both grouped subintervals. Then compute the correlation.
>> This again only works if the grouped data can be kept on the heap of the
>> task manager.
>> On Tue, Apr 7, 2015 at 1:29 PM, Sebastian &lt;

> ssc@

> &gt; &lt;mailto:

> ssc@

> &gt;> wrote:
>>     How large are the individual time series?
>>     -s
>>     On 07.04.2015 12:42, Kostas Tzoumas wrote:
>>         Hi everyone,
>>         I'm forwarding a private conversation to the list with Mats'
>>         approval.
>>         The problem is how to compute correlation between time series in
>>         Flink.
>>         We have two time series, U and V, and need to compute 1000
>>         correlation
>>         measures between the series, each measure shifts one series by
>>         one more
>>         item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.
>>         Any ideas on how one can do that without a Cartesian product?
>>         Best,
>>         Kostas
>>         ---------- Forwarded message ----------
>>         From: *Mats Zachrison* &lt;

> mats.zachrison@

> &gt;         &lt;mailto:

> mats.zachrison@

> &gt;
>>         &lt;mailto:

> mats.zachrison@

> &gt;         &lt;mailto:

> mats.zachrison@

> &gt;>>
>>         Date: Tue, Mar 31, 2015 at 9:21 AM
>>         Subject:
>>         To: Kostas Tzoumas &lt;

> kostas@

> &gt;         &lt;mailto:

> kostas@

> &gt;
>>         &lt;mailto:[hidden email]__com
> &gt;         &lt;mailto:

> kostas@

> &gt;>>, Stefan Avesand
>>         &lt;

> stefan.avesand@

> &gt;         &lt;mailto:

> stefan.avesand@

> &gt;
>>         &lt;mailto:

> stefan.avesand@

> &gt;         &lt;mailto:

> stefan.avesand@

> &gt;>>
>>         Cc: "

> stephan@

>>         &lt;mailto:

> stephan@

> &gt;
>>         &lt;mailto:[hidden email]__com
> &gt;         &lt;mailto:

> stephan@

> &gt;>"
>>         &lt;

> stephan@

>  &lt;mailto:

> stephan@

> &gt;
>>         &lt;mailto:[hidden email]__com
> &gt;         &lt;mailto:

> stephan@

> &gt;>>
>>         As Stefan said, what I’m trying to achieve is basically a nice
>>         way to do
>>         a correlation between two large time series. Since I’m looking
>>         for an
>>         optimal delay between the two series, I’d like to delay one of
>> the
>>         series x observations when doing the correlation, and step x
>>         from 1 to
>>         1000.____
>>         __ __
>>         Some pseudo code:____
>>         __ __
>>             For (x = 1 to 1000)____
>>                 Shift Series A ‘x-1’ steps____
>>                 Correlation[x] = Correlate(Series A and Series B)____
>>             End For____
>>         __ __
>>         In R, using cor() and apply(), this could look like:____
>>         __ __
>>             shift <- as.array(c(1:1000))____
>>             corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data),
>>         ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____
>>         __ __
>>         __ __
>>         Since this basically is 1000 independent correlation
>>         calculations, it is
>>         fairly easy to parallelize. Here is an R example using foreach()
>> and
>>         package doParallel:____
>>         __ __
>>             cl <- makeCluster(3)____
>>             registerDoParallel(cl)____
>>             corrAB <- foreach(step = c(1:1000)) %dopar% {____
>>                   corrAB <- cor(data[step:nrow(data), ]$ColumnA,
>>         data[1:(nrow(data) - (step - 1)), ]$ColumnB)____
>>             }____
>>             stopCluster(cl)____
>>         __ __
>>         So I guess the question is – how to do this in a Flink
>>         environment? Do
>>         we have to define how to parallelize the algorithm, or can the
>>         cluster
>>         take care of that for us?____
>>         __ __
>>         And of course this is most interesting on a generic level –
>>         given the
>>         environment of a multi-core or –processor setup running Flink,
>>         how hard
>>         is it to take advantage of all the clock cycles? Do we have to
>>         split the
>>         algorithm, and data, and distribute the processing, or can the
>>         system do
>>         much of that for us?____
>>         __
>>         __ __
>>         __

View this message in context: http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-External-Talk-Apache-Flink-Speakers-Kostas-Tzoumas-CEO-dataArtisans-Stephan-Ewen-CTO-dataArtisan-tp955p975.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view

Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

This post was updated on .

Sorry for not replying earlier. I've been on vacation.
NW in the previous message refers to Network, so the data in question is basically an aggregate of packet traces. We have two columns, one for sent bytes during that last millisecond and one for received bytes during the same time period.

As I described earlier we wanted to do a quick check if there was any correlation between received and sent bytes for different lags between the columns.

I eventually parallelized this computation using Spark. Mllib has a somewhat new function called sliding which can be applied to an RDD with a given window size (in our case 1000). I then use an accumulator with an array for each of the 1000 correlations, i.e. no map/reduce phases, only one foreach.

I think the sliding function would be a nice addition to Flink as it will allow many algorithms for time series analysis.

Snippet from the code:
// Calculate the denominator and numerator parts of the pearson correlation formula
val corr_numerator = sc.accumulator(new Array[Double](window_size))
val y1_adjsum = sc.accumulator(0.0)
val y2_adjsum = sc.accumulator(0.0)
data.sliding(window_size).foreach(w => {
  corr_numerator += w.map(v => (w(0)(y1_idx) - y1_mean) * (v(y2_idx) - y2_mean))
  y1_adjsum += math.pow(w(0)(y1_idx) - y1_mean, 2)
  y2_adjsum += math.pow(w(0)(y2_idx) - y2_mean, 2)

// Calculate the pearson correlation for each element in the sliding window
val corr_denom = math.pow(y1_adjsum.value * y2_adjsum.value, 0.5)
val res = corr_numerator.value.map(x => x / corr_denom)

Best regards,