Flink Mongodb

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Flink Mongodb

Flavio Pompermaier
Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;fhueske@apache.org&#39;);" target="_blank">fhueske@...> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
What do you mean for  "might lack support for local split assignment"? 
You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
I don't know if that possible anymore..

AzureTableInputFormat extends InputFormat<Text, WritableEntity> while MongoInputFormat extends InputFormat<Object, BSONObject>

and thus I cannot do the following..

HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
    new MongoInputFormat(), Object.class, BSONObject.class, new Job());

Am I'm doing something wrong or is this a problem of Flink ?


On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <[hidden email]> wrote:
What do you mean for  "might lack support for local split assignment"? 
You mean that InputFormat is not serializable? This instead is not true for Mongodb?


On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
Sorry I was looking at the wrong MongoInputFormat..the correct one is this:

It works both with Avro and Kryo as default serializer (see GenericTypeInfo.createSerializer()).

On Tue, Nov 4, 2014 at 10:30 AM, Flavio Pompermaier <[hidden email]> wrote:
I don't know if that possible anymore..

AzureTableInputFormat extends InputFormat<Text, WritableEntity> while MongoInputFormat extends InputFormat<Object, BSONObject>

and thus I cannot do the following..

HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
    new MongoInputFormat(), Object.class, BSONObject.class, new Job());

Am I'm doing something wrong or is this a problem of Flink ?


On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <[hidden email]> wrote:
What do you mean for  "might lack support for local split assignment"? 
You mean that InputFormat is not serializable? This instead is not true for Mongodb?


On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Stephan Ewen
Absolutely, please share the example!

On Tue, Nov 4, 2014 at 3:02 PM, Flavio Pompermaier <[hidden email]> wrote:
Sorry I was looking at the wrong MongoInputFormat..the correct one is this:

It works both with Avro and Kryo as default serializer (see GenericTypeInfo.createSerializer()).

On Tue, Nov 4, 2014 at 10:30 AM, Flavio Pompermaier <[hidden email]> wrote:
I don't know if that possible anymore..

AzureTableInputFormat extends InputFormat<Text, WritableEntity> while MongoInputFormat extends InputFormat<Object, BSONObject>

and thus I cannot do the following..

HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
    new MongoInputFormat(), Object.class, BSONObject.class, new Job());

Am I'm doing something wrong or is this a problem of Flink ?


On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <[hidden email]> wrote:
What do you mean for  "might lack support for local split assignment"? 
You mean that InputFormat is not serializable? This instead is not true for Mongodb?


On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
Ok! I hope to write some blog within tomorrow evening!

On Tue, Nov 4, 2014 at 3:16 PM, Stephan Ewen <[hidden email]> wrote:
Absolutely, please share the example!

On Tue, Nov 4, 2014 at 3:02 PM, Flavio Pompermaier <[hidden email]> wrote:
Sorry I was looking at the wrong MongoInputFormat..the correct one is this:

It works both with Avro and Kryo as default serializer (see GenericTypeInfo.createSerializer()).

On Tue, Nov 4, 2014 at 10:30 AM, Flavio Pompermaier <[hidden email]> wrote:
I don't know if that possible anymore..

AzureTableInputFormat extends InputFormat<Text, WritableEntity> while MongoInputFormat extends InputFormat<Object, BSONObject>

and thus I cannot do the following..

HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
    new MongoInputFormat(), Object.class, BSONObject.class, new Job());

Am I'm doing something wrong or is this a problem of Flink ?


On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <[hidden email]> wrote:
What do you mean for  "might lack support for local split assignment"? 
You mean that InputFormat is not serializable? This instead is not true for Mongodb?


On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
That would be great! :-)

2014-11-04 15:26 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok! I hope to write some blog within tomorrow evening!

On Tue, Nov 4, 2014 at 3:16 PM, Stephan Ewen <[hidden email]> wrote:
Absolutely, please share the example!

On Tue, Nov 4, 2014 at 3:02 PM, Flavio Pompermaier <[hidden email]> wrote:
Sorry I was looking at the wrong MongoInputFormat..the correct one is this:

It works both with Avro and Kryo as default serializer (see GenericTypeInfo.createSerializer()).

On Tue, Nov 4, 2014 at 10:30 AM, Flavio Pompermaier <[hidden email]> wrote:
I don't know if that possible anymore..

AzureTableInputFormat extends InputFormat<Text, WritableEntity> while MongoInputFormat extends InputFormat<Object, BSONObject>

and thus I cannot do the following..

HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
    new MongoInputFormat(), Object.class, BSONObject.class, new Job());

Am I'm doing something wrong or is this a problem of Flink ?


On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <[hidden email]> wrote:
What do you mean for  "might lack support for local split assignment"? 
You mean that InputFormat is not serializable? This instead is not true for Mongodb?


On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
In reply to this post by Flavio Pompermaier
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?

On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Stephan Ewen

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Stephan Ewen
In reply to this post by Flavio Pompermaier

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:
Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.



Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Fabian Hueske
Btw. I found this blog post that describes HBase regions and region splitting: http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/

2014-11-04 21:42 GMT+01:00 Fabian Hueske <[hidden email]>:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.




Reply | Threaded
Open this post in threaded view
|

Re: Flink Mongodb

Flavio Pompermaier
In reply to this post by Fabian Hueske
So how are we going to proceed here? Is someone willing to help me in improving the splitting policy or we leave it as it is now?

On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <[hidden email]> wrote:
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits as the size of a split depends on its density. Too large splits are prone to cause data skew, too small ones will increase the overhead of split assignment.

A solution for this problem could be to add an optional parameter to the IF to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <[hidden email]>:

Typo: it should have meant that workers that get a larger split will get fewer additional splits.

Am 04.11.2014 20:48 schrieb [hidden email]:

InputSplits are assigned lazily at runtime, which gives you many of the benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates multiple splits per region? Then the lazy assignment will make sure that workers that get a larger split will get get additional splits than workers that get smaller splits...

Am 04.11.2014 20:32 schrieb "Fabian Hueske" <[hidden email]>:

Hmm, that's good question indeed. I am not familiar with HBase's mode of operation.
I would assume, that HBase uses range partitioning to partition a table into regions. That way it is rather easy to balance the size of regions, as long as there is no single key that occurs very often. I am not sure if it is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's internals for verification.

In any case, Flink does currently not support reassigning or splitting of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would be possible if we can efficiently determine the "density" of a key range when creating the InputSplits. However, I'm a bit skeptical that this can be done... 

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <[hidden email]>:
From what I know HBase manages the regions but the fact that they are evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot spotting).

Could it be a good idea to create a split policy that compares the size of all the splits and generate equally-sized split that can be reassigned to free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <[hidden email]> wrote:
ad 1) HBase manages the regions and should also take care of their uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment. However, the input split generation of the IF should also be able to handle such issues upfront. In fact, the IF could also generate multiple splits per region (this would be necessary to make sure that the minimum number of splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Ok, thanks for the explanation! 
That was more or less like I thought it should be but there are still points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..? There will be one slot that takes a very long time while the others will stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop processing of huge region if it worth it and assign remaining data to inactive task managers)?


On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Local split assignment preferably assigns input split to workers that can locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster and gives access to these chunks to every worker via network transfer. However, if a chunk is read from a process that runs on the same node as the chunk is stored, the read operation directly accesses the local file system without going over the network. Hence, it is essential to assign input splits based on the locality of their data if you want to have reasonably performance. We call this local split assignment. This is a general concept of all data parallel systems including Hadoop, Spark, and Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
What do you mean for  "might lack support for local split assignment"? You mean that InputFormat is not serializable? This instead is not true for Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <[hidden email]> wrote:
There's a page about Hadoop Compatibility that shows how to use the wrapper. 

The HBase format should work as well, but might lack support for local split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
Should I start from http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html ? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

Hi to all,

but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs? 

Best,
Flavio

.




12