Flink Mongodb

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.





Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
Just shared the example at https://github.com/okkam-it/flink-mongodb-test and twitted :)

The next step is to show how to write the result of a Flink process back to Mongo. 
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <[hidden email]> wrote:
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.






Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
Any help here..?

On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <[hidden email]> wrote:
Just shared the example at https://github.com/okkam-it/flink-mongodb-test and twitted :)

The next step is to show how to write the result of a Flink process back to Mongo. 
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <[hidden email]> wrote:
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.






Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Stephan Ewen
Hi Flavio!

I think the general method is the same as with the inputs.


You can then call

DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;

data.output(mongoOutput);


Greetings,
Stephan


On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <[hidden email]> wrote:
Any help here..?

On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <[hidden email]> wrote:
Just shared the example at https://github.com/okkam-it/flink-mongodb-test and twitted :)

The next step is to show how to write the result of a Flink process back to Mongo. 
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <[hidden email]> wrote:
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.







Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
I'm trying to do that but I can't find the proper typing.. For example:

DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable, BSONWritable>, String>() {

private static final long serialVersionUID = 1L;

@Override
public String map(Tuple2<BSONWritable, BSONWritable> record) throws Exception {
BSONWritable value = record.getField(1);
BSONObject doc = value.getDoc();
BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
String type = jsonld.getString("@type");
return type;
}
});

MongoConfigUtil.setOutputURI( hdIf.getJobConf(), "mongodb://localhost:27017/test.test");
fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));

Obviously this doesn't work because I'm emitting strings and trying to write BSONWritable ..can you show me a simple working example?

Best,
Flavio

On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I think the general method is the same as with the inputs.


You can then call

DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;

data.output(mongoOutput);


Greetings,
Stephan


On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <[hidden email]> wrote:
Any help here..?

On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <[hidden email]> wrote:
Just shared the example at https://github.com/okkam-it/flink-mongodb-test and twitted :)

The next step is to show how to write the result of a Flink process back to Mongo. 
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <[hidden email]> wrote:
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.








Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Stephan Ewen
Hi!
Can you:

  - either return a BSONWritable from the function
  - or type the output formats to String?



Stephan


On Thu, Nov 6, 2014 at 4:12 PM, Flavio Pompermaier <[hidden email]> wrote:
I'm trying to do that but I can't find the proper typing.. For example:

DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable, BSONWritable>, String>() {

private static final long serialVersionUID = 1L;

@Override
public String map(Tuple2<BSONWritable, BSONWritable> record) throws Exception {
BSONWritable value = record.getField(1);
BSONObject doc = value.getDoc();
BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
String type = jsonld.getString("@type");
return type;
}
});

MongoConfigUtil.setOutputURI( hdIf.getJobConf(), "mongodb://localhost:27017/test.test");
fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));

Obviously this doesn't work because I'm emitting strings and trying to write BSONWritable ..can you show me a simple working example?

Best,
Flavio

On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I think the general method is the same as with the inputs.


You can then call

DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;

data.output(mongoOutput);


Greetings,
Stephan


On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <[hidden email]> wrote:
Any help here..?

On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <[hidden email]> wrote:
Just shared the example at https://github.com/okkam-it/flink-mongodb-test and twitted :)

The next step is to show how to write the result of a Flink process back to Mongo. 
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <[hidden email]> wrote:
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.









Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
I managed to write back to mongo using this:

MongoConfigUtil.setOutputURI( hdIf.getJobConf(), "mongodb://localhost:27017/test.testData");
// emit result (this works only locally)
fin.output(new HadoopOutputFormat<Text,BSONWritable>(new MongoOutputFormat<Text,BSONWritable>(), hdIf.getJobConf()));

So I updated also the example at https://github.com/okkam-it/flink-mongodb-test :)

On Thu, Nov 6, 2014 at 5:10 PM, Stephan Ewen <[hidden email]> wrote:
Hi!
Can you:

  - either return a BSONWritable from the function
  - or type the output formats to String?



Stephan


On Thu, Nov 6, 2014 at 4:12 PM, Flavio Pompermaier <[hidden email]> wrote:
I'm trying to do that but I can't find the proper typing.. For example:

DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable, BSONWritable>, String>() {

private static final long serialVersionUID = 1L;

@Override
public String map(Tuple2<BSONWritable, BSONWritable> record) throws Exception {
BSONWritable value = record.getField(1);
BSONObject doc = value.getDoc();
BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
String type = jsonld.getString("@type");
return type;
}
});

MongoConfigUtil.setOutputURI( hdIf.getJobConf(), "mongodb://localhost:27017/test.test");
fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));

Obviously this doesn't work because I'm emitting strings and trying to write BSONWritable ..can you show me a simple working example?

Best,
Flavio

On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I think the general method is the same as with the inputs.


You can then call

DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;

data.output(mongoOutput);


Greetings,
Stephan


On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <[hidden email]> wrote:
Any help here..?

On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <[hidden email]> wrote:
Just shared the example at https://github.com/okkam-it/flink-mongodb-test and twitted :)

The next step is to show how to write the result of a Flink process back to Mongo. 
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <[hidden email]> wrote:
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.









Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Stephan Ewen
Hi Flavio!

Looks very nice :-)

Is the repository going to stay for a while? Can we link to your example from the Flink Website?

Stephan


On Fri, Nov 7, 2014 at 5:01 PM, Flavio Pompermaier <[hidden email]> wrote:
I managed to write back to mongo using this:

MongoConfigUtil.setOutputURI( hdIf.getJobConf(), "mongodb://localhost:27017/test.testData");
// emit result (this works only locally)
fin.output(new HadoopOutputFormat<Text,BSONWritable>(new MongoOutputFormat<Text,BSONWritable>(), hdIf.getJobConf()));

So I updated also the example at https://github.com/okkam-it/flink-mongodb-test :)

On Thu, Nov 6, 2014 at 5:10 PM, Stephan Ewen <[hidden email]> wrote:
Hi!
Can you:

  - either return a BSONWritable from the function
  - or type the output formats to String?



Stephan


On Thu, Nov 6, 2014 at 4:12 PM, Flavio Pompermaier <[hidden email]> wrote:
I'm trying to do that but I can't find the proper typing.. For example:

DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable, BSONWritable>, String>() {

private static final long serialVersionUID = 1L;

@Override
public String map(Tuple2<BSONWritable, BSONWritable> record) throws Exception {
BSONWritable value = record.getField(1);
BSONObject doc = value.getDoc();
BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
String type = jsonld.getString("@type");
return type;
}
});

MongoConfigUtil.setOutputURI( hdIf.getJobConf(), "mongodb://localhost:27017/test.test");
fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));

Obviously this doesn't work because I'm emitting strings and trying to write BSONWritable ..can you show me a simple working example?

Best,
Flavio

On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I think the general method is the same as with the inputs.


You can then call

DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;

data.output(mongoOutput);


Greetings,
Stephan


On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <[hidden email]> wrote:
Any help here..?

On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <[hidden email]> wrote:
Just shared the example at https://github.com/okkam-it/flink-mongodb-test and twitted :)

The next step is to show how to write the result of a Flink process back to Mongo. 
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <[hidden email]> wrote:
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.










Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
Yes, it will stay there as long it will work :)
However if you want to bring it into official flink examples it will be better I think!

Best,
Flavio

On Mon, Nov 10, 2014 at 5:06 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Looks very nice :-)

Is the repository going to stay for a while? Can we link to your example from the Flink Website?

Stephan


On Fri, Nov 7, 2014 at 5:01 PM, Flavio Pompermaier <[hidden email]> wrote:
I managed to write back to mongo using this:

MongoConfigUtil.setOutputURI( hdIf.getJobConf(), "mongodb://localhost:27017/test.testData");
// emit result (this works only locally)
fin.output(new HadoopOutputFormat<Text,BSONWritable>(new MongoOutputFormat<Text,BSONWritable>(), hdIf.getJobConf()));

So I updated also the example at https://github.com/okkam-it/flink-mongodb-test :)

On Thu, Nov 6, 2014 at 5:10 PM, Stephan Ewen <[hidden email]> wrote:
Hi!
Can you:

  - either return a BSONWritable from the function
  - or type the output formats to String?



Stephan


On Thu, Nov 6, 2014 at 4:12 PM, Flavio Pompermaier <[hidden email]> wrote:
I'm trying to do that but I can't find the proper typing.. For example:

DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable, BSONWritable>, String>() {

private static final long serialVersionUID = 1L;

@Override
public String map(Tuple2<BSONWritable, BSONWritable> record) throws Exception {
BSONWritable value = record.getField(1);
BSONObject doc = value.getDoc();
BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
String type = jsonld.getString("@type");
return type;
}
});

MongoConfigUtil.setOutputURI( hdIf.getJobConf(), "mongodb://localhost:27017/test.test");
fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));

Obviously this doesn't work because I'm emitting strings and trying to write BSONWritable ..can you show me a simple working example?

Best,
Flavio

On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I think the general method is the same as with the inputs.


You can then call

DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;

data.output(mongoOutput);


Greetings,
Stephan


On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <[hidden email]> wrote:
Any help here..?

On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <[hidden email]> wrote:
Just shared the example at https://github.com/okkam-it/flink-mongodb-test and twitted :)

The next step is to show how to write the result of a Flink process back to Mongo. 
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <[hidden email]> wrote:
How about going for an optional parameter for the InputFormat to determine into how many splits each region is split?
That would be a lightweight option to control the number of splits with low effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <[hidden email]>:
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?


On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.











12