Re: data enrichment with SQL use case
Posted by
Fabian Hueske-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/data-enrichment-with-SQL-use-case-tp19520p19721.html
Hi Alex,
An operator that has to join two input streams obviously requires two inputs. In case of an enrichment join, the operator should first read the meta-data stream and build up a data structure as state against which the other input is joined. If the meta data is (infrequently) updated, these updates should be integrated into the state.
The problem is that it is currently not possible to implement such an operator with Flink because operators cannot decide from which input to read, i.e., they have to process whatever data is given to them.
Hence, it is not possible to build up a data structure from the meta data stream before consuming the other stream.
There are a few workarounds that work in special cases.
1) The meta data is rather small and never updated. You put the meta data as a file into a (distributed) file system an read it from each function instance when it is initialized, i.e., in open(), and put into a hash map. Each function instance will hold the complete meta data in memory (on the heap). Since the meta data is broadcasted, the other stream does not need to be partitioned to join against the meta data in the hash map. You can implement this function as a FlatMapFunction or ProcessFunction.
2) The meta data is too large and/or is updated. In this case, you need a function with two inputs. Both inputs are keyed (keyBy()) on a join attribute. Since you cannot hold back the non-meta data stream, you need to buffer it in (keyed) state until you've read the meta data stream up to a point when you can start processing the other stream. If the meta data is updated at some point, you can just add the new data to the state. The benefits of this approach is that the state is shared across all operators and can be updated. However, you might need to initially buffer quite a bit of data in state if the non-meta data stream has a high volume.
Hope that one of these approaches works for your use case.
Best, Fabian