Cross operation on two huge datasets

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

Cross operation on two huge datasets

Gwenhael Pasquiers

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Fabian Hueske-2
Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’


Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Xingcan Cui
Hi all,

@Gwen From the database's point of view, the only way to avoid Cartesian product in join is to use index, which exhibits as key grouping in Flink. However, it only supports many-to-one mapping now, i.e., a shape or a point can only be distributed to a single group. Only points and shapes belonging to the same group can be joined and that could reduce the inherent pair comparisons (compared with a Cartesian product). It's perfectly suitable for equi-join.

@Fabian I saw this thread when I was just considering about theta-join (which will eventually be supported) in Flink. Since it's impossible to group (index) a dataset for an arbitrary theta-join, I think we may need some duplication mechanism here. For example, split a dataset into n parts and send the other dataset to all of these parts. This could be more useful in stream join. BTW, it seems that I've seen another thread discussing about this, but can not find it now. What do you think?

Best,
Xingcan

On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <[hidden email]> wrote:
Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’



Reply | Threaded
Open this post in threaded view
|

RE: Cross operation on two huge datasets

Gwenhael Pasquiers

Hi and thanks for your answers !

 

I’m not sure I can define any index to split the workload since in my case any point could be in any zone...

I think I’m currently trying to do it the way you call “theta-join”:

1-      Trying to split one dataset over the cluster and prepare it for work against with the other one (ex: parse the shapes)

a.       Either using partitioning

b.       Either using N sources + filtering based on hash so I get complementary datasets

2-      Make my other dataset go “through” all the “splits” of the first one and enrich / filter it

a.       The dataset would probably have to be entirely read multiple times from hdfs (one time per “split”)

 

I have other ideas but I don’t know if it’s doable in flink.

 

Question:

 

Is there a way for a object (key selector, flatmap) to obtain (and wait for) the result of a previous dataset ? Only way I can think of is a “cross” between my one-record-dataset (the result) and the other dataset. But maybe that’s very bad regarding resources ?

 

I’d like to try using a flatmap that clones the dataset in N parts (adding a partition key 0 to N-1 to each record), then use partitioning to “dispatch” each clone of the dataset to a matching “shape matcher” partition; then I’d use cross to do the work, then group back the results together (in case N clones of a point were inside different shapes). Maybe that would split the workload of the cross by dividing the size of one of the two datasets member of that cross …

sorry for my rambling if I’m not clear.

 

B.R.

 

 

From: Xingcan Cui [mailto:[hidden email]]
Sent: jeudi 23 février 2017 06:00
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi all,

 

@Gwen From the database's point of view, the only way to avoid Cartesian product in join is to use index, which exhibits as key grouping in Flink. However, it only supports many-to-one mapping now, i.e., a shape or a point can only be distributed to a single group. Only points and shapes belonging to the same group can be joined and that could reduce the inherent pair comparisons (compared with a Cartesian product). It's perfectly suitable for equi-join.

 

@Fabian I saw this thread when I was just considering about theta-join (which will eventually be supported) in Flink. Since it's impossible to group (index) a dataset for an arbitrary theta-join, I think we may need some duplication mechanism here. For example, split a dataset into n parts and send the other dataset to all of these parts. This could be more useful in stream join. BTW, it seems that I've seen another thread discussing about this, but can not find it now. What do you think?

 

Best,

Xingcan

 

On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <[hidden email]> wrote:

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Fabian Hueske-2
In reply to this post by Xingcan Cui
Hi,

Flink's batch DataSet API does already support (manual) theta-joins via the CrossFunction. It combines each pair of records of two input data sets. This is done by broadcasting (and hence replicating) one of the inputs.
@Xingcan, so I think what you describe is already there.
However, as I said before, it is often prohibitively expensive to compute. When you are at a point, where a MapFunction with broadcast set is not longer sufficient (the smaller data set does not fit into memory), you're problem is often too big too compute.
The complexity of a Cartesian product (Cross) is simply quadratic.

Regarding the specific problem of joining spatial shapes and points, I would go with a spatial partitioning as follows:
- Partition the space and compute for each shape into which partitions it belongs (could be more than one).
- Do the same for the points (will be exactly one).
- Do a 1-n join on the partition ids + an additional check if the point is actually in the shape.

The challenge here is to have partitions of similar size.

Cheers, Fabian

2017-02-23 5:59 GMT+01:00 Xingcan Cui <[hidden email]>:
Hi all,

@Gwen From the database's point of view, the only way to avoid Cartesian product in join is to use index, which exhibits as key grouping in Flink. However, it only supports many-to-one mapping now, i.e., a shape or a point can only be distributed to a single group. Only points and shapes belonging to the same group can be joined and that could reduce the inherent pair comparisons (compared with a Cartesian product). It's perfectly suitable for equi-join.

@Fabian I saw this thread when I was just considering about theta-join (which will eventually be supported) in Flink. Since it's impossible to group (index) a dataset for an arbitrary theta-join, I think we may need some duplication mechanism here. For example, split a dataset into n parts and send the other dataset to all of these parts. This could be more useful in stream join. BTW, it seems that I've seen another thread discussing about this, but can not find it now. What do you think?

Best,
Xingcan

On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <[hidden email]> wrote:
Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’




Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Fabian Hueske-2
Hi Gwen,

sorry I didn't read your answer, I was still writing mine when you sent yours ;-)

Regarding your strategy, this is basically what Cross does:
It keeps on input partitioned and broadcasts (replicates) the other one. On each partition, it combines the records of the partition of the first input with all records of the replicated second input.
I think this is what you describe as well, right?

As I wrote before, this approach is quadratic and does not scale to large data sizes.
I would recommend to look into spatial partitioning. Otherwise, I do not see how the problem can be solved for large data sets.

Best, Fabian

2017-02-23 12:00 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi,

Flink's batch DataSet API does already support (manual) theta-joins via the CrossFunction. It combines each pair of records of two input data sets. This is done by broadcasting (and hence replicating) one of the inputs.
@Xingcan, so I think what you describe is already there.
However, as I said before, it is often prohibitively expensive to compute. When you are at a point, where a MapFunction with broadcast set is not longer sufficient (the smaller data set does not fit into memory), you're problem is often too big too compute.
The complexity of a Cartesian product (Cross) is simply quadratic.

Regarding the specific problem of joining spatial shapes and points, I would go with a spatial partitioning as follows:
- Partition the space and compute for each shape into which partitions it belongs (could be more than one).
- Do the same for the points (will be exactly one).
- Do a 1-n join on the partition ids + an additional check if the point is actually in the shape.

The challenge here is to have partitions of similar size.

Cheers, Fabian

2017-02-23 5:59 GMT+01:00 Xingcan Cui <[hidden email]>:
Hi all,

@Gwen From the database's point of view, the only way to avoid Cartesian product in join is to use index, which exhibits as key grouping in Flink. However, it only supports many-to-one mapping now, i.e., a shape or a point can only be distributed to a single group. Only points and shapes belonging to the same group can be joined and that could reduce the inherent pair comparisons (compared with a Cartesian product). It's perfectly suitable for equi-join.

@Fabian I saw this thread when I was just considering about theta-join (which will eventually be supported) in Flink. Since it's impossible to group (index) a dataset for an arbitrary theta-join, I think we may need some duplication mechanism here. For example, split a dataset into n parts and send the other dataset to all of these parts. This could be more useful in stream join. BTW, it seems that I've seen another thread discussing about this, but can not find it now. What do you think?

Best,
Xingcan

On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <[hidden email]> wrote:
Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’





Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Xingcan Cui
Hi,

@Gwen, sorry that I missed the cross function and showed you the wrong way. @Fabian's answers are what I mean. 

Considering that the cross function is so expensive, can we find a way to restrict the broadcast. That is, if the groupBy function is a many-to-one mapping, the cross function is an all-to-all mapping, is it possible to define a many-to-many mapping function that broadcasts shapes to more than one (but not all) index area?

Best,
Xingcan

On Thu, Feb 23, 2017 at 7:07 PM, Fabian Hueske <[hidden email]> wrote:
Hi Gwen,

sorry I didn't read your answer, I was still writing mine when you sent yours ;-)

Regarding your strategy, this is basically what Cross does:
It keeps on input partitioned and broadcasts (replicates) the other one. On each partition, it combines the records of the partition of the first input with all records of the replicated second input.
I think this is what you describe as well, right?

As I wrote before, this approach is quadratic and does not scale to large data sizes.
I would recommend to look into spatial partitioning. Otherwise, I do not see how the problem can be solved for large data sets.

Best, Fabian

2017-02-23 12:00 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi,

Flink's batch DataSet API does already support (manual) theta-joins via the CrossFunction. It combines each pair of records of two input data sets. This is done by broadcasting (and hence replicating) one of the inputs.
@Xingcan, so I think what you describe is already there.
However, as I said before, it is often prohibitively expensive to compute. When you are at a point, where a MapFunction with broadcast set is not longer sufficient (the smaller data set does not fit into memory), you're problem is often too big too compute.
The complexity of a Cartesian product (Cross) is simply quadratic.

Regarding the specific problem of joining spatial shapes and points, I would go with a spatial partitioning as follows:
- Partition the space and compute for each shape into which partitions it belongs (could be more than one).
- Do the same for the points (will be exactly one).
- Do a 1-n join on the partition ids + an additional check if the point is actually in the shape.

The challenge here is to have partitions of similar size.

Cheers, Fabian

2017-02-23 5:59 GMT+01:00 Xingcan Cui <[hidden email]>:
Hi all,

@Gwen From the database's point of view, the only way to avoid Cartesian product in join is to use index, which exhibits as key grouping in Flink. However, it only supports many-to-one mapping now, i.e., a shape or a point can only be distributed to a single group. Only points and shapes belonging to the same group can be joined and that could reduce the inherent pair comparisons (compared with a Cartesian product). It's perfectly suitable for equi-join.

@Fabian I saw this thread when I was just considering about theta-join (which will eventually be supported) in Flink. Since it's impossible to group (index) a dataset for an arbitrary theta-join, I think we may need some duplication mechanism here. For example, split a dataset into n parts and send the other dataset to all of these parts. This could be more useful in stream join. BTW, it seems that I've seen another thread discussing about this, but can not find it now. What do you think?

Best,
Xingcan

On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <[hidden email]> wrote:
Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’






Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Jain, Ankit
In reply to this post by Fabian Hueske-2

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

Reply | Threaded
Open this post in threaded view
|

RE: Cross operation on two huge datasets

Gwenhael Pasquiers

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21
To: [hidden email]
Cc: Fabian Hueske <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Till Rohrmann

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...

input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;

    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }

    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {

    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till


On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21
To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 


Reply | Threaded
Open this post in threaded view
|

RE: Cross operation on two huge datasets

Gwenhael Pasquiers

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21
To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Till Rohrmann

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers <a href="http://mailto:[gwenhael.pasquiers@ericsson.com](mailto:gwenhael.pasquiers@ericsson.com)">[gwenhael.pasquiers@...](mailto:gwenhael.pasquiers@...) wrote:

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42


To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21
To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Cross operation on two huge datasets

Gwenhael Pasquiers

I made it so that I don’t care where the next operator will be scheduled.

 

I configured taskslots = 1 and parallelism = yarnnodes so that :

·         Each node contains 1/N th  of the shapes (simple repartition() of the shapes dataset).

·         The points will be cloned so that each partition of the points dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different partition

 

That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure.

 

I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn).

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 15:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers <a href="http://mailto:[gwenhael.pasquiers@ericsson.com](mailto:gwenhael.pasquiers@ericsson.com)"> [gwenhael.pasquiers@...](mailto:gwenhael.pasquiers@...) wrote:

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42


To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21
To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Jain, Ankit

If I understood correctly, you have just implemented flink broadcasting by hand J.

 

You are still sending out the whole points dataset to each shape partition – right?

 

I think this could be optimized by using a keyBy or custom partition which is common across shapes & points – that should make sure a given point always go to same shape node.

 

I didn’t understand why Till Rohrmann said “you don’t know where Flink will schedule the new operator instance” – new operators are created when flink job is started – right? So, there should be no more new operators once the job is running and if you use consistent hash partitioning, same input should always end at same task manager node.

 

You could store the output as Flink State – that would be more fault tolerant but storing it as cache in JVM should work too.

 

Is this a batch job or streaming?

 

Between I am a newbee to Flink, still only learning – so take my suggestions with caution J

 

Thanks

Ankit

 

From: Gwenhael Pasquiers <[hidden email]>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "[hidden email]" <[hidden email]>
Subject: RE: Cross operation on two huge datasets

 

I made it so that I don’t care where the next operator will be scheduled.

 

I configured taskslots = 1 and parallelism = yarnnodes so that :

·         Each node contains 1/N th  of the shapes (simple repartition() of the shapes dataset).

·         The points will be cloned so that each partition of the points dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different partition

 

That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure.

 

I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn).

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 15:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers <a href="http://mailto:[gwenhael.pasquiers@ericsson.com](mailto:gwenhael.pasquiers@ericsson.com)"> [gwenhael.pasquiers@...](mailto:gwenhael.pasquiers@...) wrote:

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42


To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21

To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Cross operation on two huge datasets

Xingcan Cui
Hi Gwen,

in my view, indexing and searching are two isolated processes and they should be separated. Maybe you should take the RTree structure as a new dataset (fortunately it's static, right?) and store it to a distributed cache or DFS that can be accessed by operators from any nodes. That will make the mapping from index partition to operator consistent (regardless of the locality problem).

Besides, you can make a "weak" index first, e.g., partitioning the points and shapes to "left" and "right", and in that way you do not need to broadcast the points to all index nodes (only left to left and right to right).

Best,
Xingcan

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit <[hidden email]> wrote:

If I understood correctly, you have just implemented flink broadcasting by hand J.

 

You are still sending out the whole points dataset to each shape partition – right?

 

I think this could be optimized by using a keyBy or custom partition which is common across shapes & points – that should make sure a given point always go to same shape node.

 

I didn’t understand why Till Rohrmann said “you don’t know where Flink will schedule the new operator instance” – new operators are created when flink job is started – right? So, there should be no more new operators once the job is running and if you use consistent hash partitioning, same input should always end at same task manager node.

 

You could store the output as Flink State – that would be more fault tolerant but storing it as cache in JVM should work too.

 

Is this a batch job or streaming?

 

Between I am a newbee to Flink, still only learning – so take my suggestions with caution J

 

Thanks

Ankit

 

From: Gwenhael Pasquiers <[hidden email]>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "[hidden email]" <[hidden email]>
Subject: RE: Cross operation on two huge datasets

 

I made it so that I don’t care where the next operator will be scheduled.

 

I configured taskslots = 1 and parallelism = yarnnodes so that :

·         Each node contains 1/N th  of the shapes (simple repartition() of the shapes dataset).

·         The points will be cloned so that each partition of the points dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different partition

 

That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure.

 

I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn).

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 15:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers [gwenhael.pasquiers@ericsson.com](mailto:gwenhael.pasquiers@...) wrote:

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42


To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21

To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Cross operation on two huge datasets

Gwenhael Pasquiers
In reply to this post by Gwenhael Pasquiers

I managed to avoid the classes reload by controlling the order of operations using “.withBroadcast”.

 

My first task (shapes parsing) now outputs an empty “DataSet<Void> synchro”

 

Then whenever I need to wait for that synchro dataset to be ready (and mainly the operations prior to that dataset to be done), I use “.withBroadcast(“synchro”, synchro)” and I do a get for that broadcast variable in my open method.

That way I’m sure that I won’t begin testing my points against an incomplete static RTree. And also, since it’s a single job again, my static RTree remains valid
J

 

Seems to be good for now even if the static thingie is a bit dirty.

 

However I’m surprised that reading 20 MB of parquet become 21GB of “bytes sent” by the flink reader.

 

 

From: Gwenhael Pasquiers [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 16:28
To: [hidden email]
Subject: RE: Cross operation on two huge datasets

 

I made it so that I don’t care where the next operator will be scheduled.

 

I configured taskslots = 1 and parallelism = yarnnodes so that :

·         Each node contains 1/N th  of the shapes (simple repartition() of the shapes dataset).

·         The points will be cloned so that each partition of the points dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different partition

 

That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure.

 

I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn).

 

 

From: Till Rohrmann [[hidden email]]
Sent: jeudi 2 mars 2017 15:40
To:
[hidden email]
Subject: Re: Cross operation on two huge datasets

 

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers <a href="http://mailto:[gwenhael.pasquiers@ericsson.com](mailto:gwenhael.pasquiers@ericsson.com)">[gwenhael.pasquiers@...](mailto:gwenhael.pasquiers@...) wrote:

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42


To:
[hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21
To:
[hidden email]
Cc: Fabian Hueske <
[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <
[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Cross operation on two huge datasets

Gwenhael Pasquiers
In reply to this post by Xingcan Cui

I tried putting my structure in a dataset but when serializing  kryo went in an infinite recursive loop (crashed in StackOverflowException). So I’m staying with the static reference.

 

As for the partitioning, there is always the case of shapes overlapping on both right and left sections, I think it would take quite a bit of effort to implement. And it’s always better if I don’t have to manually set a frontier between the two (n) zones

 

From: Xingcan Cui [mailto:[hidden email]]
Sent: vendredi 3 mars 2017 02:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

 

in my view, indexing and searching are two isolated processes and they should be separated. Maybe you should take the RTree structure as a new dataset (fortunately it's static, right?) and store it to a distributed cache or DFS that can be accessed by operators from any nodes. That will make the mapping from index partition to operator consistent (regardless of the locality problem).

 

Besides, you can make a "weak" index first, e.g., partitioning the points and shapes to "left" and "right", and in that way you do not need to broadcast the points to all index nodes (only left to left and right to right).

 

Best,

Xingcan

 

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit <[hidden email]> wrote:

If I understood correctly, you have just implemented flink broadcasting by hand J.

 

You are still sending out the whole points dataset to each shape partition – right?

 

I think this could be optimized by using a keyBy or custom partition which is common across shapes & points – that should make sure a given point always go to same shape node.

 

I didn’t understand why Till Rohrmann said “you don’t know where Flink will schedule the new operator instance” – new operators are created when flink job is started – right? So, there should be no more new operators once the job is running and if you use consistent hash partitioning, same input should always end at same task manager node.

 

You could store the output as Flink State – that would be more fault tolerant but storing it as cache in JVM should work too.

 

Is this a batch job or streaming?

 

Between I am a newbee to Flink, still only learning – so take my suggestions with caution J

 

Thanks

Ankit

 

From: Gwenhael Pasquiers <[hidden email]>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "[hidden email]" <[hidden email]>
Subject: RE: Cross operation on two huge datasets

 

I made it so that I don’t care where the next operator will be scheduled.

 

I configured taskslots = 1 and parallelism = yarnnodes so that :

·         Each node contains 1/N th  of the shapes (simple repartition() of the shapes dataset).

·         The points will be cloned so that each partition of the points dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different partition

 

That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure.

 

I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn).

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 15:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers [gwenhael.pasquiers@...](mailto:gwenhael.pasquiers@...) wrote:

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42


To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21

To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Cross operation on two huge datasets

Gwenhael Pasquiers

To answer Ankit,

 

It is a batch application.

 

Yes, I admit I did broadcasting by hand. I did it that way because the only other way I found to “broadcast” a DataSet was to use “withBroadcast”, and I was afraid that “withBroadcast” would make flink load the whole dataset in memory before broadcasting it rather than sending its elements 1 by 1.

I’ll try to use it, I’ll take anything that will make my code cleaner !

 

From: Gwenhael Pasquiers [mailto:[hidden email]]
Sent: vendredi 3 mars 2017 17:55
To: [hidden email]
Subject: RE: Cross operation on two huge datasets

 

I tried putting my structure in a dataset but when serializing  kryo went in an infinite recursive loop (crashed in StackOverflowException). So I’m staying with the static reference.

 

As for the partitioning, there is always the case of shapes overlapping on both right and left sections, I think it would take quite a bit of effort to implement. And it’s always better if I don’t have to manually set a frontier between the two (n) zones

 

From: Xingcan Cui [[hidden email]]
Sent: vendredi 3 mars 2017 02:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

 

in my view, indexing and searching are two isolated processes and they should be separated. Maybe you should take the RTree structure as a new dataset (fortunately it's static, right?) and store it to a distributed cache or DFS that can be accessed by operators from any nodes. That will make the mapping from index partition to operator consistent (regardless of the locality problem).

 

Besides, you can make a "weak" index first, e.g., partitioning the points and shapes to "left" and "right", and in that way you do not need to broadcast the points to all index nodes (only left to left and right to right).

 

Best,

Xingcan

 

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit <[hidden email]> wrote:

If I understood correctly, you have just implemented flink broadcasting by hand J.

 

You are still sending out the whole points dataset to each shape partition – right?

 

I think this could be optimized by using a keyBy or custom partition which is common across shapes & points – that should make sure a given point always go to same shape node.

 

I didn’t understand why Till Rohrmann said “you don’t know where Flink will schedule the new operator instance” – new operators are created when flink job is started – right? So, there should be no more new operators once the job is running and if you use consistent hash partitioning, same input should always end at same task manager node.

 

You could store the output as Flink State – that would be more fault tolerant but storing it as cache in JVM should work too.

 

Is this a batch job or streaming?

 

Between I am a newbee to Flink, still only learning – so take my suggestions with caution J

 

Thanks

Ankit

 

From: Gwenhael Pasquiers <[hidden email]>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "[hidden email]" <[hidden email]>
Subject: RE: Cross operation on two huge datasets

 

I made it so that I don’t care where the next operator will be scheduled.

 

I configured taskslots = 1 and parallelism = yarnnodes so that :

·         Each node contains 1/N th  of the shapes (simple repartition() of the shapes dataset).

·         The points will be cloned so that each partition of the points dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different partition

 

That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure.

 

I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn).

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 15:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers [gwenhael.pasquiers@...](mailto:gwenhael.pasquiers@...) wrote:

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42


To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21

To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Cross operation on two huge datasets

Gwenhael Pasquiers

Maybe I won’t try to broadcast my dataset after all : I finally found again what made me implement it with my own cloning flatmap + partitioning :

 

Quoted from https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#broadcast-variables

 

Note: As the content of broadcast variables is kept in-memory on each node, it should not become too large. For simpler things like scalar values you can simply make parameters part of the closure of a function, or use the withParameters(...) method to pass in a configuration.

 

From: Gwenhael Pasquiers [mailto:[hidden email]]
Sent: vendredi 3 mars 2017 18:10
To: [hidden email]
Subject: RE: Cross operation on two huge datasets

 

To answer Ankit,

 

It is a batch application.

 

Yes, I admit I did broadcasting by hand. I did it that way because the only other way I found to “broadcast” a DataSet was to use “withBroadcast”, and I was afraid that “withBroadcast” would make flink load the whole dataset in memory before broadcasting it rather than sending its elements 1 by 1.

I’ll try to use it, I’ll take anything that will make my code cleaner !

 

From: Gwenhael Pasquiers [[hidden email]]
Sent: vendredi 3 mars 2017 17:55
To: [hidden email]
Subject: RE: Cross operation on two huge datasets

 

I tried putting my structure in a dataset but when serializing  kryo went in an infinite recursive loop (crashed in StackOverflowException). So I’m staying with the static reference.

 

As for the partitioning, there is always the case of shapes overlapping on both right and left sections, I think it would take quite a bit of effort to implement. And it’s always better if I don’t have to manually set a frontier between the two (n) zones

 

From: Xingcan Cui [[hidden email]]
Sent: vendredi 3 mars 2017 02:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

 

in my view, indexing and searching are two isolated processes and they should be separated. Maybe you should take the RTree structure as a new dataset (fortunately it's static, right?) and store it to a distributed cache or DFS that can be accessed by operators from any nodes. That will make the mapping from index partition to operator consistent (regardless of the locality problem).

 

Besides, you can make a "weak" index first, e.g., partitioning the points and shapes to "left" and "right", and in that way you do not need to broadcast the points to all index nodes (only left to left and right to right).

 

Best,

Xingcan

 

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit <[hidden email]> wrote:

If I understood correctly, you have just implemented flink broadcasting by hand J.

 

You are still sending out the whole points dataset to each shape partition – right?

 

I think this could be optimized by using a keyBy or custom partition which is common across shapes & points – that should make sure a given point always go to same shape node.

 

I didn’t understand why Till Rohrmann said “you don’t know where Flink will schedule the new operator instance” – new operators are created when flink job is started – right? So, there should be no more new operators once the job is running and if you use consistent hash partitioning, same input should always end at same task manager node.

 

You could store the output as Flink State – that would be more fault tolerant but storing it as cache in JVM should work too.

 

Is this a batch job or streaming?

 

Between I am a newbee to Flink, still only learning – so take my suggestions with caution J

 

Thanks

Ankit

 

From: Gwenhael Pasquiers <[hidden email]>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "[hidden email]" <[hidden email]>
Subject: RE: Cross operation on two huge datasets

 

I made it so that I don’t care where the next operator will be scheduled.

 

I configured taskslots = 1 and parallelism = yarnnodes so that :

·         Each node contains 1/N th  of the shapes (simple repartition() of the shapes dataset).

·         The points will be cloned so that each partition of the points dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different partition

 

That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure.

 

I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn).

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 15:40
To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers [gwenhael.pasquiers@...](mailto:gwenhael.pasquiers@...) wrote:

The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand.

 

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ?

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: jeudi 2 mars 2017 14:42


To: [hidden email]
Subject: Re: Cross operation on two huge datasets

 

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...
 
input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;
 
    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }
 
    @Override
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
 
    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till

 

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <[hidden email]> wrote:

I (almost) made it work the following way:

 

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip).

 

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes.

 

This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one).

 

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map.

 

Something like :

 

datasetA.flatmap(new MyMapOperator(datasetB))…

 

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static

 

Or any other way that would allow me to do something similar.

 

Thanks in advance for your insight.

 

Gwen’

 

From: Jain, Ankit [mailto:[hidden email]]
Sent: jeudi 23 février 2017 19:21

To: [hidden email]
Cc: Fabian Hueske <[hidden email]>


Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region.

 

Thanks

Ankit

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <[hidden email]>
Subject: Re: Cross operation on two huge datasets

 

Hi Gwen,

Flink usually performs a block nested loop join to cross two data sets.

This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records.

You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute.

For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross.

Best, Fabian

 

 

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas).

 

I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area.

 

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

 

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times).

 

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me J

 

I’m using flink 1.0.1.

 

Thanks in advance

 

Gwen’