Discussion about a Flink DataSource repository

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Discussion about a Flink DataSource repository

Flavio Pompermaier
Hi to all,

as discussed briefly with Fabian, for our products in Okkam we need a central repository of DataSources processed by Flink.
With respect to existing external catalogs, such as Hive or Confluent's SchemaRegistry, whose objective is to provide necessary metadata to read/write the registered tables, we would also need a way to acess to other general metadata (e.g. name, description, creator, creation date, lastUpdate date, processedRecords, certificationLevel of provided data, provenance, language, etc).

This integration has 2 main goals:
  1. In a UI: to enable the user to choose (or even create) a datasource to process with some task (e.g. quality assessment) and then see its metadata (name, description,  creator user, etc)
  2. During a Flink job: when 2 datasource gets joined and we have multiple values for an attribute (e.g. name or lastname) we can access the datasource metadata to decide which value to retain (e.g. the one coming from the most authoritative/certified source for that attribute)
We also think that this could be of interest for projects like Apache Zeppelin or Nifi enabling them to suggest to the user the sources to start from.

Do you think it makes sense to think about designing such a module for Flink?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Discussion about a Flink DataSource repository

Fabian Hueske-2
Hi Flavio,

I thought a bit about your proposal. I am not sure if it is actually necessary to integrate a central source repository into Flink. It should be possible to offer this as an external service which is based on the recently added TableSource interface. TableSources could be extended to be able to serialize and descerialize their configuration to/from JSON. When the external repository service starts, it can read the JSON fields and instantiate and register TableSource objectes. The repository could also hold metadata about the sources and serve a (web) UI to list available source. When a Flink program wants to access a data source which is registered in the repository, it could lookup the respective TableSouce object from the repository.

Given that an integration of metadata with Flink user functions (point 2. in your proposal) is a very special requirement, I am not sure how much "native" support should be added to Flink. Would it be possible to add a lineage tag to each record and ship the metadata of all sources as broadcast set to each operator? Then user functions could lookup the metadata from the broadcast set.

Best, Fabian

2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

as discussed briefly with Fabian, for our products in Okkam we need a central repository of DataSources processed by Flink.
With respect to existing external catalogs, such as Hive or Confluent's SchemaRegistry, whose objective is to provide necessary metadata to read/write the registered tables, we would also need a way to acess to other general metadata (e.g. name, description, creator, creation date, lastUpdate date, processedRecords, certificationLevel of provided data, provenance, language, etc).

This integration has 2 main goals:
  1. In a UI: to enable the user to choose (or even create) a datasource to process with some task (e.g. quality assessment) and then see its metadata (name, description,  creator user, etc)
  2. During a Flink job: when 2 datasource gets joined and we have multiple values for an attribute (e.g. name or lastname) we can access the datasource metadata to decide which value to retain (e.g. the one coming from the most authoritative/certified source for that attribute)
We also think that this could be of interest for projects like Apache Zeppelin or Nifi enabling them to suggest to the user the sources to start from.

Do you think it makes sense to think about designing such a module for Flink?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Discussion about a Flink DataSource repository

Flavio Pompermaier
HI Fabian,
thanks for your detailed answer, as usual ;)

I think that an external service it's ok,actually I wasn't aware of the TableSource interface.
As you said, an utility to serialize and deserialize them would be very helpful and will ease this thing.
However, registering metadata for a table is a very common task to do. Wouldn't be of useful for other Flink-related projects (I was thinking to Nifi for example) to define a common minimal set of (optional) metadata to display in a UI for a TableSource (like name, description, creationDate, creator, field aliases)?

About point 2, I think that dataset broadcasting or closure variables are useful when you write a program, not if you try to "compose" it using reusable UDFs (using a script like in Pig).
Of course, the worst case scenario for us (e.g. right now) is to connect to our repository within rich operators but I thought that it could be easy to define a link from operators to TableEnvironment and then to TableSource (using the lineage tag/source-id you said) and, finally to its metadata. I don't know whether this is specific only to us, I just wanted to share our needs and see if the table API development could benefit from them.

Best,
Flavio

On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

I thought a bit about your proposal. I am not sure if it is actually necessary to integrate a central source repository into Flink. It should be possible to offer this as an external service which is based on the recently added TableSource interface. TableSources could be extended to be able to serialize and descerialize their configuration to/from JSON. When the external repository service starts, it can read the JSON fields and instantiate and register TableSource objectes. The repository could also hold metadata about the sources and serve a (web) UI to list available source. When a Flink program wants to access a data source which is registered in the repository, it could lookup the respective TableSouce object from the repository.

Given that an integration of metadata with Flink user functions (point 2. in your proposal) is a very special requirement, I am not sure how much "native" support should be added to Flink. Would it be possible to add a lineage tag to each record and ship the metadata of all sources as broadcast set to each operator? Then user functions could lookup the metadata from the broadcast set.

Best, Fabian

2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

as discussed briefly with Fabian, for our products in Okkam we need a central repository of DataSources processed by Flink.
With respect to existing external catalogs, such as Hive or Confluent's SchemaRegistry, whose objective is to provide necessary metadata to read/write the registered tables, we would also need a way to acess to other general metadata (e.g. name, description, creator, creation date, lastUpdate date, processedRecords, certificationLevel of provided data, provenance, language, etc).

This integration has 2 main goals:
  1. In a UI: to enable the user to choose (or even create) a datasource to process with some task (e.g. quality assessment) and then see its metadata (name, description,  creator user, etc)
  2. During a Flink job: when 2 datasource gets joined and we have multiple values for an attribute (e.g. name or lastname) we can access the datasource metadata to decide which value to retain (e.g. the one coming from the most authoritative/certified source for that attribute)
We also think that this could be of interest for projects like Apache Zeppelin or Nifi enabling them to suggest to the user the sources to start from.

Do you think it makes sense to think about designing such a module for Flink?

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Discussion about a Flink DataSource repository

Fabian Hueske-2
Hi Flavio,

I'll open a JIRA for de/serializing TableSource to textual JSON.

Would something like this work for you?

main() {
  ExecutionEnvironment env = ...
  TableEnvironment tEnv = ...
 
  // accessing an external catalog
  YourTableSource ts = Catalog.getTableSource("someIdentifier");
  tEnv.registerTableSource("someId", ts);
 
  // preparing meta data
  MetaData meta = ts.getMetaData()
  DataSet<MetaData> metaDS = env.fromElements(meta);
 
  // read data, table transformations + conversion to DataSet
  Table t = tEnv.scan("someId"); // apply some Table transformations if necessary
  DataSet<TupleX<...>> ds = tEnv.toDataSet(t, TupleX);

  // apply custom functions on data set
  ds.map(MetaMapFunctionWrapper(new MetaMapFunction())).withBroadcastSet(metaDS, "meta");

  // continue program

}

The YourMapFunctionWrapper could be a RichMapFunction that accesses the meta data from the broadcasted set and provides it to a wrapped MetaMapFunction (an extended MapFunction with custom interface for meta data).

Depending on what kind of interface you plan to offer, you can hide most of the complexity, e.g, users would only have to implement a MetaMapFunction not have to deal with the broadcasting and accessing of meta data (this would be done by your wrapper).

Fabian
 


2016-05-05 10:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
HI Fabian,
thanks for your detailed answer, as usual ;)

I think that an external service it's ok,actually I wasn't aware of the TableSource interface.
As you said, an utility to serialize and deserialize them would be very helpful and will ease this thing.
However, registering metadata for a table is a very common task to do. Wouldn't be of useful for other Flink-related projects (I was thinking to Nifi for example) to define a common minimal set of (optional) metadata to display in a UI for a TableSource (like name, description, creationDate, creator, field aliases)?

About point 2, I think that dataset broadcasting or closure variables are useful when you write a program, not if you try to "compose" it using reusable UDFs (using a script like in Pig).
Of course, the worst case scenario for us (e.g. right now) is to connect to our repository within rich operators but I thought that it could be easy to define a link from operators to TableEnvironment and then to TableSource (using the lineage tag/source-id you said) and, finally to its metadata. I don't know whether this is specific only to us, I just wanted to share our needs and see if the table API development could benefit from them.

Best,
Flavio

On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

I thought a bit about your proposal. I am not sure if it is actually necessary to integrate a central source repository into Flink. It should be possible to offer this as an external service which is based on the recently added TableSource interface. TableSources could be extended to be able to serialize and descerialize their configuration to/from JSON. When the external repository service starts, it can read the JSON fields and instantiate and register TableSource objectes. The repository could also hold metadata about the sources and serve a (web) UI to list available source. When a Flink program wants to access a data source which is registered in the repository, it could lookup the respective TableSouce object from the repository.

Given that an integration of metadata with Flink user functions (point 2. in your proposal) is a very special requirement, I am not sure how much "native" support should be added to Flink. Would it be possible to add a lineage tag to each record and ship the metadata of all sources as broadcast set to each operator? Then user functions could lookup the metadata from the broadcast set.

Best, Fabian

2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

as discussed briefly with Fabian, for our products in Okkam we need a central repository of DataSources processed by Flink.
With respect to existing external catalogs, such as Hive or Confluent's SchemaRegistry, whose objective is to provide necessary metadata to read/write the registered tables, we would also need a way to acess to other general metadata (e.g. name, description, creator, creation date, lastUpdate date, processedRecords, certificationLevel of provided data, provenance, language, etc).

This integration has 2 main goals:
  1. In a UI: to enable the user to choose (or even create) a datasource to process with some task (e.g. quality assessment) and then see its metadata (name, description,  creator user, etc)
  2. During a Flink job: when 2 datasource gets joined and we have multiple values for an attribute (e.g. name or lastname) we can access the datasource metadata to decide which value to retain (e.g. the one coming from the most authoritative/certified source for that attribute)
We also think that this could be of interest for projects like Apache Zeppelin or Nifi enabling them to suggest to the user the sources to start from.

Do you think it makes sense to think about designing such a module for Flink?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Discussion about a Flink DataSource repository

Flavio Pompermaier
That was more or less what I was thinking. The only thing I'm not sure is the usage of the broadcasted dataset, since I'd need to access tot the MetaData dataset by sourceId (so I'd need an Map<String, Metadata>.
Probably I'd do:

Map<String, Metadata> meta = ...;//preparing metadata lookUp table
...
ds.map(MetaMapFunctionWrapper(new MetaMapFunction(meta)))

What do you think? Is there the possibility to open a broadcasted Dataset as a Map instead of a List?

Best,
Flavio

On Fri, May 6, 2016 at 12:06 PM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

I'll open a JIRA for de/serializing TableSource to textual JSON.

Would something like this work for you?

main() {
  ExecutionEnvironment env = ...
  TableEnvironment tEnv = ...
 
  // accessing an external catalog
  YourTableSource ts = Catalog.getTableSource("someIdentifier");
  tEnv.registerTableSource("someId", ts);
 
  // preparing meta data
  MetaData meta = ts.getMetaData()
  DataSet<MetaData> metaDS = env.fromElements(meta);
 
  // read data, table transformations + conversion to DataSet
  Table t = tEnv.scan("someId"); // apply some Table transformations if necessary
  DataSet<TupleX<...>> ds = tEnv.toDataSet(t, TupleX);

  // apply custom functions on data set
  ds.map(MetaMapFunctionWrapper(new MetaMapFunction())).withBroadcastSet(metaDS, "meta");

  // continue program

}

The YourMapFunctionWrapper could be a RichMapFunction that accesses the meta data from the broadcasted set and provides it to a wrapped MetaMapFunction (an extended MapFunction with custom interface for meta data).

Depending on what kind of interface you plan to offer, you can hide most of the complexity, e.g, users would only have to implement a MetaMapFunction not have to deal with the broadcasting and accessing of meta data (this would be done by your wrapper).

Fabian
 


2016-05-05 10:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
HI Fabian,
thanks for your detailed answer, as usual ;)

I think that an external service it's ok,actually I wasn't aware of the TableSource interface.
As you said, an utility to serialize and deserialize them would be very helpful and will ease this thing.
However, registering metadata for a table is a very common task to do. Wouldn't be of useful for other Flink-related projects (I was thinking to Nifi for example) to define a common minimal set of (optional) metadata to display in a UI for a TableSource (like name, description, creationDate, creator, field aliases)?

About point 2, I think that dataset broadcasting or closure variables are useful when you write a program, not if you try to "compose" it using reusable UDFs (using a script like in Pig).
Of course, the worst case scenario for us (e.g. right now) is to connect to our repository within rich operators but I thought that it could be easy to define a link from operators to TableEnvironment and then to TableSource (using the lineage tag/source-id you said) and, finally to its metadata. I don't know whether this is specific only to us, I just wanted to share our needs and see if the table API development could benefit from them.

Best,
Flavio

On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

I thought a bit about your proposal. I am not sure if it is actually necessary to integrate a central source repository into Flink. It should be possible to offer this as an external service which is based on the recently added TableSource interface. TableSources could be extended to be able to serialize and descerialize their configuration to/from JSON. When the external repository service starts, it can read the JSON fields and instantiate and register TableSource objectes. The repository could also hold metadata about the sources and serve a (web) UI to list available source. When a Flink program wants to access a data source which is registered in the repository, it could lookup the respective TableSouce object from the repository.

Given that an integration of metadata with Flink user functions (point 2. in your proposal) is a very special requirement, I am not sure how much "native" support should be added to Flink. Would it be possible to add a lineage tag to each record and ship the metadata of all sources as broadcast set to each operator? Then user functions could lookup the metadata from the broadcast set.

Best, Fabian

2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

as discussed briefly with Fabian, for our products in Okkam we need a central repository of DataSources processed by Flink.
With respect to existing external catalogs, such as Hive or Confluent's SchemaRegistry, whose objective is to provide necessary metadata to read/write the registered tables, we would also need a way to acess to other general metadata (e.g. name, description, creator, creation date, lastUpdate date, processedRecords, certificationLevel of provided data, provenance, language, etc).

This integration has 2 main goals:
  1. In a UI: to enable the user to choose (or even create) a datasource to process with some task (e.g. quality assessment) and then see its metadata (name, description,  creator user, etc)
  2. During a Flink job: when 2 datasource gets joined and we have multiple values for an attribute (e.g. name or lastname) we can access the datasource metadata to decide which value to retain (e.g. the one coming from the most authoritative/certified source for that attribute)
We also think that this could be of interest for projects like Apache Zeppelin or Nifi enabling them to suggest to the user the sources to start from.

Do you think it makes sense to think about designing such a module for Flink?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Discussion about a Flink DataSource repository

Fabian Hueske-2
Yes, you can transform the broadcast set when it is accessed with RuntimeContext.getBroadcastVariableWithInitializer() and a BroadcastVariableInitializer.

2016-05-06 14:07 GMT+02:00 Flavio Pompermaier <[hidden email]>:
That was more or less what I was thinking. The only thing I'm not sure is the usage of the broadcasted dataset, since I'd need to access tot the MetaData dataset by sourceId (so I'd need an Map<String, Metadata>.
Probably I'd do:

Map<String, Metadata> meta = ...;//preparing metadata lookUp table
...
ds.map(MetaMapFunctionWrapper(new MetaMapFunction(meta)))

What do you think? Is there the possibility to open a broadcasted Dataset as a Map instead of a List?

Best,
Flavio


On Fri, May 6, 2016 at 12:06 PM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

I'll open a JIRA for de/serializing TableSource to textual JSON.

Would something like this work for you?

main() {
  ExecutionEnvironment env = ...
  TableEnvironment tEnv = ...
 
  // accessing an external catalog
  YourTableSource ts = Catalog.getTableSource("someIdentifier");
  tEnv.registerTableSource("someId", ts);
 
  // preparing meta data
  MetaData meta = ts.getMetaData()
  DataSet<MetaData> metaDS = env.fromElements(meta);
 
  // read data, table transformations + conversion to DataSet
  Table t = tEnv.scan("someId"); // apply some Table transformations if necessary
  DataSet<TupleX<...>> ds = tEnv.toDataSet(t, TupleX);

  // apply custom functions on data set
  ds.map(MetaMapFunctionWrapper(new MetaMapFunction())).withBroadcastSet(metaDS, "meta");

  // continue program

}

The YourMapFunctionWrapper could be a RichMapFunction that accesses the meta data from the broadcasted set and provides it to a wrapped MetaMapFunction (an extended MapFunction with custom interface for meta data).

Depending on what kind of interface you plan to offer, you can hide most of the complexity, e.g, users would only have to implement a MetaMapFunction not have to deal with the broadcasting and accessing of meta data (this would be done by your wrapper).

Fabian
 


2016-05-05 10:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
HI Fabian,
thanks for your detailed answer, as usual ;)

I think that an external service it's ok,actually I wasn't aware of the TableSource interface.
As you said, an utility to serialize and deserialize them would be very helpful and will ease this thing.
However, registering metadata for a table is a very common task to do. Wouldn't be of useful for other Flink-related projects (I was thinking to Nifi for example) to define a common minimal set of (optional) metadata to display in a UI for a TableSource (like name, description, creationDate, creator, field aliases)?

About point 2, I think that dataset broadcasting or closure variables are useful when you write a program, not if you try to "compose" it using reusable UDFs (using a script like in Pig).
Of course, the worst case scenario for us (e.g. right now) is to connect to our repository within rich operators but I thought that it could be easy to define a link from operators to TableEnvironment and then to TableSource (using the lineage tag/source-id you said) and, finally to its metadata. I don't know whether this is specific only to us, I just wanted to share our needs and see if the table API development could benefit from them.

Best,
Flavio

On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske <[hidden email]> wrote:
Hi Flavio,

I thought a bit about your proposal. I am not sure if it is actually necessary to integrate a central source repository into Flink. It should be possible to offer this as an external service which is based on the recently added TableSource interface. TableSources could be extended to be able to serialize and descerialize their configuration to/from JSON. When the external repository service starts, it can read the JSON fields and instantiate and register TableSource objectes. The repository could also hold metadata about the sources and serve a (web) UI to list available source. When a Flink program wants to access a data source which is registered in the repository, it could lookup the respective TableSouce object from the repository.

Given that an integration of metadata with Flink user functions (point 2. in your proposal) is a very special requirement, I am not sure how much "native" support should be added to Flink. Would it be possible to add a lineage tag to each record and ship the metadata of all sources as broadcast set to each operator? Then user functions could lookup the metadata from the broadcast set.

Best, Fabian

2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,

as discussed briefly with Fabian, for our products in Okkam we need a central repository of DataSources processed by Flink.
With respect to existing external catalogs, such as Hive or Confluent's SchemaRegistry, whose objective is to provide necessary metadata to read/write the registered tables, we would also need a way to acess to other general metadata (e.g. name, description, creator, creation date, lastUpdate date, processedRecords, certificationLevel of provided data, provenance, language, etc).

This integration has 2 main goals:
  1. In a UI: to enable the user to choose (or even create) a datasource to process with some task (e.g. quality assessment) and then see its metadata (name, description,  creator user, etc)
  2. During a Flink job: when 2 datasource gets joined and we have multiple values for an attribute (e.g. name or lastname) we can access the datasource metadata to decide which value to retain (e.g. the one coming from the most authoritative/certified source for that attribute)
We also think that this could be of interest for projects like Apache Zeppelin or Nifi enabling them to suggest to the user the sources to start from.

Do you think it makes sense to think about designing such a module for Flink?

Best,
Flavio