SQL Expression to Flink FilterFunction?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

SQL Expression to Flink FilterFunction?

Theo
Hi there,

I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs.

I was wondering if I could somehow transform a SQL WHERE expression into a Flink FilterFunction?

My approach right now is to register my Stream as a table, run a SQL query on it and return back to a DataStream like so:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

List<SomePOJO> data = createPOJOTestData();
DataStream<SomePOJO> stream = env.fromCollection(data);

//final Table asTable = tEnv.fromDataStream(stream);
//Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO SQL style 'AND' possible here...

tEnv.registerDataStream("SAMPLE", stream);
Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = 'pitter' AND age > 10");

stream = tEnv.toAppendStream(filteredTable, SomePOJO.class);
List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream));
//... test assertions

It feels a bit weird that I need to go the full way up to the SQL API with registering the table to "just" apply the WHERE clause of a table and that I can't assign uid, operator_name to this operator anymore, leaving the DataStream world.

Is the way I wrote it the best way to approach or do you have any better idea? Are there any caveats here? Not that I didn't assign the event time column on purpose as I know that it's just a WHERE without any windowing etc and I wanted to test that it still works without any explicit time column :)

Best regards
Theo

Reply | Threaded
Open this post in threaded view
|

Re: SQL Expression to Flink FilterFunction?

Leonard Xu
Hi, Theo

Currently, It’s hard to do this in your DataStream application from my understanding, because converting sql expression to Flink operator happens in underlying table planner (more precisely in code generate phase) and it does not expose interface to user so that you can not assign operator name, operator id.

Best,
Leonard Xu 

在 2020年6月5日,00:18,Theo Diefenthal <[hidden email]> 写道:

Hi there,

I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs.

I was wondering if I could somehow transform a SQL WHERE expression into a Flink FilterFunction?

My approach right now is to register my Stream as a table, run a SQL query on it and return back to a DataStream like so:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

List<SomePOJO> data = createPOJOTestData();
DataStream<SomePOJO> stream = env.fromCollection(data);

//final Table asTable = tEnv.fromDataStream(stream);
//Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO SQL style 'AND' possible here...

tEnv.registerDataStream("SAMPLE", stream);
Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = 'pitter' AND age > 10");

stream = tEnv.toAppendStream(filteredTable, SomePOJO.class);
List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream));
//... test assertions

It feels a bit weird that I need to go the full way up to the SQL API with registering the table to "just" apply the WHERE clause of a table and that I can't assign uid, operator_name to this operator anymore, leaving the DataStream world.

Is the way I wrote it the best way to approach or do you have any better idea? Are there any caveats here? Not that I didn't assign the event time column on purpose as I know that it's just a WHERE without any windowing etc and I wanted to test that it still works without any explicit time column :)

Best regards
Theo


Reply | Threaded
Open this post in threaded view
|

Re: SQL Expression to Flink FilterFunction?

Jark Wu-3
This is possible but may need some development. There is a similar util in table tests called `org.apache.flink.table.expressions.utils.ExpressionTestBase` [1],
it converts/translates expressions (either Table API Expression or SQL expression) into a MapFunction.

I think you can imitate the way of ExpressionTestBase, to translate into a FilterFunction. 

Best,
Jark


On Fri, 5 Jun 2020 at 10:17, Leonard Xu <[hidden email]> wrote:
Hi, Theo

Currently, It’s hard to do this in your DataStream application from my understanding, because converting sql expression to Flink operator happens in underlying table planner (more precisely in code generate phase) and it does not expose interface to user so that you can not assign operator name, operator id.

Best,
Leonard Xu 

在 2020年6月5日,00:18,Theo Diefenthal <[hidden email]> 写道:

Hi there,

I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs.

I was wondering if I could somehow transform a SQL WHERE expression into a Flink FilterFunction?

My approach right now is to register my Stream as a table, run a SQL query on it and return back to a DataStream like so:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

List<SomePOJO> data = createPOJOTestData();
DataStream<SomePOJO> stream = env.fromCollection(data);

//final Table asTable = tEnv.fromDataStream(stream);
//Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO SQL style 'AND' possible here...

tEnv.registerDataStream("SAMPLE", stream);
Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = 'pitter' AND age > 10");

stream = tEnv.toAppendStream(filteredTable, SomePOJO.class);
List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream));
//... test assertions

It feels a bit weird that I need to go the full way up to the SQL API with registering the table to "just" apply the WHERE clause of a table and that I can't assign uid, operator_name to this operator anymore, leaving the DataStream world.

Is the way I wrote it the best way to approach or do you have any better idea? Are there any caveats here? Not that I didn't assign the event time column on purpose as I know that it's just a WHERE without any windowing etc and I wanted to test that it still works without any explicit time column :)

Best regards
Theo