SQL Table API: Naming operations done in query

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

SQL Table API: Naming operations done in query

Juan Gentile

Hello,

 

We are currently testing the SQL API using 1.4.0 version of Flink and we would like to know if it’s possible to name a query or parts of it so we can easily recognize what it’s doing when we run it.

An additional question is, In case of small changes done to the query/ies, and assuming we are using checkpoints, how will it restore the state of it/them when we re-deploy?

 

Thank you,

Juan G.

Reply | Threaded
Open this post in threaded view
|

Re: SQL Table API: Naming operations done in query

Timo Walther
Hi Juan,

usually the Flink operators contain the optimized expression that was defined in SQL. You can also name the the entire job using env.execute("Your Name") if that would help to identify the query.

Regarding checkpoints, it depends how you define "small changes". You must ensure that the execution plan (Flink operator structure) remain unchanged. So it is no problem to change scalar functions calls or change values of literals. But you should not change return types between operations.

I think we will have an additional page in the docs about that soon.

Regards,
Timo

Am 2/27/18 um 2:26 PM schrieb Juan Gentile:

Hello,

 

We are currently testing the SQL API using 1.4.0 version of Flink and we would like to know if it’s possible to name a query or parts of it so we can easily recognize what it’s doing when we run it.

An additional question is, In case of small changes done to the query/ies, and assuming we are using checkpoints, how will it restore the state of it/them when we re-deploy?

 

Thank you,

Juan G.


Reply | Threaded
Open this post in threaded view
|

Re: SQL Table API: Naming operations done in query

Juho Autio
Hi, has there been any changes to state handling with Flink SQL? Anything
planned?

I didn't find it at
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html.


Recently I ran into problems when trying to restore the state after changes
that I thought wouldn't change the execution plan.

This is what I did basically:

SingleOutputStreamOperator kafkaStream = createKafkaStream(env);

registerTable("events1", kafkaStream, tableEnv);
Table result1 = tableEnv.sql(sql1);
addSink(result1);

registerTable("events2", kafkaStream, tableEnv);
Table result2 = tableEnv.sql(sql2);
addSink(result2);


Now, even though I created two different tables from kafkaStream, they end
up in the same StreamTableEnvironment and Flink optimizes the plan combining
common parts of sql1 & sql2 :) That's of course nice when stream DAG doesn't
need to be evolved.. But if I'd want to force a "split" after the
kafkaStream, is there any way to do so without having to read from kafka
multiple times?

If I now add this:

registerTable("events3", kafkaStream, tableEnv);
Table result3 = tableEnv.sql(sql3);
addSink(result3);

..restoring from savepoint fails, because Flink pushes for example filters
(WHERE conditions) from this new query into the first common block of the
SQL execution plan, and adding another branch to the DAG changing the common
block, too. So even though this change looked like I'm only adding something
new, it also touched the existing parts of the DAG.

I ended up using java API where I define operators explicitly, even setting
uid on each operator to ensure savepoint compatibility. I would hope to be
able to use SQL and Table API instead, if it was also possible to restore
state after this kind of incremental changes.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: SQL Table API: Naming operations done in query

Fabian Hueske-2
Hmmm, that's a strange behavior that is unexpected (to me).
Flink optimizes the Table API / SQL queries when a Table is converted into a DataStream (or DataSet) or emitted to a TableSink.
So, given that you convert the result tables in addSink() into a DataStream and write them to a sink function, they should not share any operators or plan fragments.
Especially, because you are registering new tables for each query (Calcite's optimizer does not even know that there is something to share).

Maybe, the problem happens at the level of the DataStream API and some operators are chained?

Can you share the code of your addSink() function?

Thanks, Fabian



2018-03-16 15:53 GMT+01:00 Juho Autio <[hidden email]>:
Hi, has there been any changes to state handling with Flink SQL? Anything
planned?

I didn't find it at
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html.


Recently I ran into problems when trying to restore the state after changes
that I thought wouldn't change the execution plan.

This is what I did basically:

SingleOutputStreamOperator kafkaStream = createKafkaStream(env);

registerTable("events1", kafkaStream, tableEnv);
Table result1 = tableEnv.sql(sql1);
addSink(result1);

registerTable("events2", kafkaStream, tableEnv);
Table result2 = tableEnv.sql(sql2);
addSink(result2);


Now, even though I created two different tables from kafkaStream, they end
up in the same StreamTableEnvironment and Flink optimizes the plan combining
common parts of sql1 & sql2 :) That's of course nice when stream DAG doesn't
need to be evolved.. But if I'd want to force a "split" after the
kafkaStream, is there any way to do so without having to read from kafka
multiple times?

If I now add this:

registerTable("events3", kafkaStream, tableEnv);
Table result3 = tableEnv.sql(sql3);
addSink(result3);

..restoring from savepoint fails, because Flink pushes for example filters
(WHERE conditions) from this new query into the first common block of the
SQL execution plan, and adding another branch to the DAG changing the common
block, too. So even though this change looked like I'm only adding something
new, it also touched the existing parts of the DAG.

I ended up using java API where I define operators explicitly, even setting
uid on each operator to ensure savepoint compatibility. I would hope to be
able to use SQL and Table API instead, if it was also possible to restore
state after this kind of incremental changes.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/