Hi to all,
as discussed briefly with Fabian, for our products in Okkam we need a central repository of DataSources processed by Flink. With respect to existing external catalogs, such as Hive or Confluent's SchemaRegistry, whose objective is to provide necessary metadata to read/write the registered tables, we would also need a way to acess to other general metadata (e.g. name, description, creator, creation date, lastUpdate date, processedRecords, certificationLevel of provided data, provenance, language, etc). This integration has 2 main goals:
We also think that this could be of interest for projects like Apache Zeppelin or Nifi enabling them to suggest to the user the sources to start from. Do you think it makes sense to think about designing such a module for Flink? Best, Flavio |
Hi Flavio, I thought a bit about your proposal. I am not sure if it is actually necessary to integrate a central source repository into Flink. It should be possible to offer this as an external service which is based on the recently added TableSource interface. TableSources could be extended to be able to serialize and descerialize their configuration to/from JSON. When the external repository service starts, it can read the JSON fields and instantiate and register TableSource objectes. The repository could also hold metadata about the sources and serve a (web) UI to list available source. When a Flink program wants to access a data source which is registered in the repository, it could lookup the respective TableSouce object from the repository. Given that an integration of metadata with Flink user functions (point 2. in your proposal) is a very special requirement, I am not sure how much "native" support should be added to Flink. Would it be possible to add a lineage tag to each record and ship the metadata of all sources as broadcast set to each operator? Then user functions could lookup the metadata from the broadcast set. Best, Fabian 2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
HI Fabian, thanks for your detailed answer, as usual ;)However, registering metadata for a table is a very common task to do. Wouldn't be of useful for other Flink-related projects (I was thinking to Nifi for example) to define a common minimal set of (optional) metadata to display in a UI for a TableSource (like name, description, creationDate, creator, field aliases)? About point 2, I think that dataset broadcasting or closure variables are useful when you write a program, not if you try to "compose" it using reusable UDFs (using a script like in Pig). Of course, the worst case scenario for us (e.g. right now) is to connect to our repository within rich operators but I thought that it could be easy to define a link from operators to TableEnvironment and then to TableSource (using the lineage tag/source-id you said) and, finally to its metadata. I don't know whether this is specific only to us, I just wanted to share our needs and see if the table API development could benefit from them. Best, Flavio On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske <[hidden email]> wrote:
|
Hi Flavio, I'll open a JIRA for de/serializing TableSource to textual JSON. TableEnvironment tEnv = ... YourTableSource ts = Catalog.getTableSource("someIdentifier"); tEnv.registerTableSource("someId", ts); // preparing meta data MetaData meta = ts.getMetaData() DataSet<MetaData> metaDS = env.fromElements(meta); // read data, table transformations + conversion to DataSet Table t = tEnv.scan("someId"); // apply some Table transformations if necessary DataSet<TupleX<...>> ds = tEnv.toDataSet(t, TupleX); // apply custom functions on data set ds.map(MetaMapFunctionWrapper(new MetaMapFunction())).withBroadcastSet(metaDS, "meta"); // continue program } The YourMapFunctionWrapper could be a RichMapFunction that accesses the meta data from the broadcasted set and provides it to a wrapped MetaMapFunction (an extended MapFunction with custom interface for meta data). Depending on what kind of interface you plan to offer, you can hide most of the complexity, e.g, users would only have to implement a MetaMapFunction not have to deal with the broadcasting and accessing of meta data (this would be done by your wrapper). Fabian 2016-05-05 10:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
That was more or less what I was thinking. The only thing I'm not sure is the usage of the broadcasted dataset, since I'd need to access tot the MetaData dataset by sourceId (so I'd need an Map<String, Metadata>. Probably I'd do:... ds.map(MetaMapFunctionWrapper(new MetaMapFunction(meta))) On Fri, May 6, 2016 at 12:06 PM, Fabian Hueske <[hidden email]> wrote:
|
Yes, you can transform the broadcast set when it is accessed with RuntimeContext.getBroadcastVariableWithInitializer() and a BroadcastVariableInitializer. 2016-05-06 14:07 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Free forum by Nabble | Edit this page |