Hi Eleanore,
A dynamic filter like the one you need, is essentially a join operation.
There is two ways to do this:
* partitioning the key set and the message on the attribute. This would be done with a KeyedCoProcessFunction.
* broadcasting the key set and just locally forwarding the messages. This would be done with a KeyedBroadcastProcessFunction.
The challenge in your application is that the key set entries have different types which is something that Flink does not very well support.
There is two ways to go about this:
1) route all data through the same operators that can handle all types. You can model this with an n-ary Either type. Flink only has a binary Either type, so you would need to implement the TypeInformation, serializer, and comparator yourself. The Either classes should give you good guidance for that.
2) have different operators and flows for each basic data type. This will fan out your job, but should be the easier approach.
Best, Fabian