Hi there, I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs. I was wondering if I could somehow transform a SQL WHERE expression into a Flink FilterFunction? My approach right now is to register my Stream as a table, run a SQL query on it and return back to a DataStream like so: StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); //... test assertions It feels a bit weird that I need to go the full way up to the SQL API with registering the table to "just" apply the WHERE clause of a table and that I can't assign uid, operator_name to this operator anymore, leaving the DataStream world. Is the way I wrote it the best way to approach or do you have any better idea? Are there any caveats here? Not that I didn't assign the event time column on purpose as I know that it's just a WHERE without any windowing etc and I wanted to test that it still works without any explicit time column :) Best regards Theo |
Hi, Theo
Currently, It’s hard to do this in your DataStream application from my understanding, because converting sql expression to Flink operator happens in underlying table planner (more precisely in code generate phase) and it does not expose interface to user so that you can not assign operator name, operator id. Best, Leonard Xu
|
This is possible but may need some development. There is a similar util in table tests called `org.apache.flink.table.expressions.utils.ExpressionTestBase` [1], it converts/translates expressions (either Table API Expression or SQL expression) into a MapFunction. I think you can imitate the way of ExpressionTestBase, to translate into a FilterFunction. Best, Jark On Fri, 5 Jun 2020 at 10:17, Leonard Xu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |