feature request: broadcast POJO objects as part of runtime context

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

feature request: broadcast POJO objects as part of runtime context

Stefano Bortoli
Hi,

trying to run some legacy code as part of Flink Job, I had to replicate configurations files across my cluster. Not a big deal with a small cluster, but it would be nice to have these configuration objects broadcast-able. Namely, it would be nice to reuse the old "read from conf file" logic to build objects that then could be serialized and used along the processing through the broadcast mechanism.

Do you think it will be possible? with the new Kryo serialization it should not be extremely complicated.

saluti,
Stefano


Reply | Threaded
Open this post in threaded view
|

Re: feature request: broadcast POJO objects as part of runtime context

Stephan Ewen
Hi!

I hope I understand correctly what you are trying to do: To have a config file available in the functions, you can simply do wither of the following:

-----------------------
Closure
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the conf object here
     conf.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
});
-----------------------


-----------------------
Config Parameters
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the c - it will will have all elements of the conf - see withParameters() below
     c.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
})
.withParameters(conf);
-----------------------


Stephan


On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <[hidden email]> wrote:
Hi,

trying to run some legacy code as part of Flink Job, I had to replicate configurations files across my cluster. Not a big deal with a small cluster, but it would be nice to have these configuration objects broadcast-able. Namely, it would be nice to reuse the old "read from conf file" logic to build objects that then could be serialized and used along the processing through the broadcast mechanism.

Do you think it will be possible? with the new Kryo serialization it should not be extremely complicated.

saluti,
Stefano



Reply | Threaded
Open this post in threaded view
|

Re: feature request: broadcast POJO objects as part of runtime context

Flavio Pompermaier

I think Stefano was asking for a different way to pass a generic Configuration object, not just a subclass of it.
For example, in our use case, it would be helpful to broadcast around generic objects/pojos.
Is that possible?

On Nov 10, 2014 6:46 PM, "Stephan Ewen" <[hidden email]> wrote:
Hi!

I hope I understand correctly what you are trying to do: To have a config file available in the functions, you can simply do wither of the following:

-----------------------
Closure
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the conf object here
     conf.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
});
-----------------------


-----------------------
Config Parameters
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the c - it will will have all elements of the conf - see withParameters() below
     c.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
})
.withParameters(conf);
-----------------------


Stephan


On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <[hidden email]> wrote:
Hi,

trying to run some legacy code as part of Flink Job, I had to replicate configurations files across my cluster. Not a big deal with a small cluster, but it would be nice to have these configuration objects broadcast-able. Namely, it would be nice to reuse the old "read from conf file" logic to build objects that then could be serialized and used along the processing through the broadcast mechanism.

Do you think it will be possible? with the new Kryo serialization it should not be extremely complicated.

saluti,
Stefano



Reply | Threaded
Open this post in threaded view
|

Re: feature request: broadcast POJO objects as part of runtime context

Stephan Ewen
You can put generic objects in the closure as well, just as you can put a configuration in the closure.

You can also distribute your objects into the cluster and then use them as a broadcast variable:

------------------------
DataSet<MyType> aux = env.fromElements(new MyType());


someOtherData.map(new MyMapper()).withBroadcastSet(aux, "my broadcast data");

------------------------




Stephan


On Mon, Nov 10, 2014 at 7:32 PM, Flavio Pompermaier <[hidden email]> wrote:

I think Stefano was asking for a different way to pass a generic Configuration object, not just a subclass of it.
For example, in our use case, it would be helpful to broadcast around generic objects/pojos.
Is that possible?

On Nov 10, 2014 6:46 PM, "Stephan Ewen" <[hidden email]> wrote:
Hi!

I hope I understand correctly what you are trying to do: To have a config file available in the functions, you can simply do wither of the following:

-----------------------
Closure
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the conf object here
     conf.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
});
-----------------------


-----------------------
Config Parameters
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the c - it will will have all elements of the conf - see withParameters() below
     c.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
})
.withParameters(conf);
-----------------------


Stephan


On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <[hidden email]> wrote:
Hi,

trying to run some legacy code as part of Flink Job, I had to replicate configurations files across my cluster. Not a big deal with a small cluster, but it would be nice to have these configuration objects broadcast-able. Namely, it would be nice to reuse the old "read from conf file" logic to build objects that then could be serialized and used along the processing through the broadcast mechanism.

Do you think it will be possible? with the new Kryo serialization it should not be extremely complicated.

saluti,
Stefano




Reply | Threaded
Open this post in threaded view
|

Re: feature request: broadcast POJO objects as part of runtime context

Flavio Pompermaier

But should MyType be serializable or can be of any type?

On Nov 10, 2014 8:06 PM, "Stephan Ewen" <[hidden email]> wrote:
You can put generic objects in the closure as well, just as you can put a configuration in the closure.

You can also distribute your objects into the cluster and then use them as a broadcast variable:

------------------------
DataSet<MyType> aux = env.fromElements(new MyType());


someOtherData.map(new MyMapper()).withBroadcastSet(aux, "my broadcast data");

------------------------




Stephan


On Mon, Nov 10, 2014 at 7:32 PM, Flavio Pompermaier <[hidden email]> wrote:

I think Stefano was asking for a different way to pass a generic Configuration object, not just a subclass of it.
For example, in our use case, it would be helpful to broadcast around generic objects/pojos.
Is that possible?

On Nov 10, 2014 6:46 PM, "Stephan Ewen" <[hidden email]> wrote:
Hi!

I hope I understand correctly what you are trying to do: To have a config file available in the functions, you can simply do wither of the following:

-----------------------
Closure
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the conf object here
     conf.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
});
-----------------------


-----------------------
Config Parameters
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the c - it will will have all elements of the conf - see withParameters() below
     c.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
})
.withParameters(conf);
-----------------------


Stephan


On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <[hidden email]> wrote:
Hi,

trying to run some legacy code as part of Flink Job, I had to replicate configurations files across my cluster. Not a big deal with a small cluster, but it would be nice to have these configuration objects broadcast-able. Namely, it would be nice to reuse the old "read from conf file" logic to build objects that then could be serialized and used along the processing through the broadcast mechanism.

Do you think it will be possible? with the new Kryo serialization it should not be extremely complicated.

saluti,
Stefano




Reply | Threaded
Open this post in threaded view
|

Re: feature request: broadcast POJO objects as part of runtime context

Stephan Ewen

Any type Flink supports as a data type.

Am 10.11.2014 20:19 schrieb "Flavio Pompermaier" <[hidden email]>:

But should MyType be serializable or can be of any type?

On Nov 10, 2014 8:06 PM, "Stephan Ewen" <[hidden email]> wrote:
You can put generic objects in the closure as well, just as you can put a configuration in the closure.

You can also distribute your objects into the cluster and then use them as a broadcast variable:

------------------------
DataSet<MyType> aux = env.fromElements(new MyType());


someOtherData.map(new MyMapper()).withBroadcastSet(aux, "my broadcast data");

------------------------




Stephan


On Mon, Nov 10, 2014 at 7:32 PM, Flavio Pompermaier <[hidden email]> wrote:

I think Stefano was asking for a different way to pass a generic Configuration object, not just a subclass of it.
For example, in our use case, it would be helpful to broadcast around generic objects/pojos.
Is that possible?

On Nov 10, 2014 6:46 PM, "Stephan Ewen" <[hidden email]> wrote:
Hi!

I hope I understand correctly what you are trying to do: To have a config file available in the functions, you can simply do wither of the following:

-----------------------
Closure
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the conf object here
     conf.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
});
-----------------------


-----------------------
Config Parameters
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the c - it will will have all elements of the conf - see withParameters() below
     c.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
})
.withParameters(conf);
-----------------------


Stephan


On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <[hidden email]> wrote:
Hi,

trying to run some legacy code as part of Flink Job, I had to replicate configurations files across my cluster. Not a big deal with a small cluster, but it would be nice to have these configuration objects broadcast-able. Namely, it would be nice to reuse the old "read from conf file" logic to build objects that then could be serialized and used along the processing through the broadcast mechanism.

Do you think it will be possible? with the new Kryo serialization it should not be extremely complicated.

saluti,
Stefano




Reply | Threaded
Open this post in threaded view
|

Re: feature request: broadcast POJO objects as part of runtime context

Stefano Bortoli
Thanks a lot for the clarification. The point is that I don't deal with the parameters directly in the map. These are just things I need to pass down to the classes I use to implement the map logic reusing some code. For example, to access a global index I need configurations for the Solr client, and to implement the matching function, I need to read some other parameters. We usually apply that logic in an application server, so the environment is always there. However, I would like to be elastic in allocating nodes for Flink without having to replicate the configuration in all servers on which I will start a task manager.

I know I would have to re-implement all the initializations to use the standard configuration parameter system. However, it would be nice just to be able to broadcast some object to ease deployment and allocation of servers. :-) Maybe we could just use zookeeper to make it easy. But again, I would have to adapt all the configuration. Which I would not do if it was not necessary.

Thanks a lot for your support and clarifications.

saluti,
Stefano

2014-11-10 21:11 GMT+01:00 Stephan Ewen <[hidden email]>:

Any type Flink supports as a data type.

Am 10.11.2014 20:19 schrieb "Flavio Pompermaier" <[hidden email]>:

But should MyType be serializable or can be of any type?

On Nov 10, 2014 8:06 PM, "Stephan Ewen" <[hidden email]> wrote:
You can put generic objects in the closure as well, just as you can put a configuration in the closure.

You can also distribute your objects into the cluster and then use them as a broadcast variable:

------------------------
DataSet<MyType> aux = env.fromElements(new MyType());


someOtherData.map(new MyMapper()).withBroadcastSet(aux, "my broadcast data");

------------------------




Stephan


On Mon, Nov 10, 2014 at 7:32 PM, Flavio Pompermaier <[hidden email]> wrote:

I think Stefano was asking for a different way to pass a generic Configuration object, not just a subclass of it.
For example, in our use case, it would be helpful to broadcast around generic objects/pojos.
Is that possible?

On Nov 10, 2014 6:46 PM, "Stephan Ewen" <[hidden email]> wrote:
Hi!

I hope I understand correctly what you are trying to do: To have a config file available in the functions, you can simply do wither of the following:

-----------------------
Closure
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the conf object here
     conf.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
});
-----------------------


-----------------------
Config Parameters
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the c - it will will have all elements of the conf - see withParameters() below
     c.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
})
.withParameters(conf);
-----------------------


Stephan


On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <[hidden email]> wrote:
Hi,

trying to run some legacy code as part of Flink Job, I had to replicate configurations files across my cluster. Not a big deal with a small cluster, but it would be nice to have these configuration objects broadcast-able. Namely, it would be nice to reuse the old "read from conf file" logic to build objects that then could be serialized and used along the processing through the broadcast mechanism.

Do you think it will be possible? with the new Kryo serialization it should not be extremely complicated.

saluti,
Stefano