Table Cache Problem

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Table Cache Problem

Yongsong He
Hi experts,
I want to cache a temporary table for reuse it

Flink version 1.10.1

the table is consumer from kafka,  struct like:
create table a (
field1 string,
field2 string,
field3 string,
field4 string
)

the sample code looks like:

val settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv = StreamTableEnvironment.create(env, settings)

val temptable = tableEnv.sqlQuery(s"select * from a where ${condition}")

temptable.where(condition1) ... then do something
temptable.where(condition2) ... then do otherthing


I want to reuse temptable for higher performance, what operators need or it already cached in flink sql plan ?

Any help would be appreciated :)

Reply | Threaded
Open this post in threaded view
|

Re: Table Cache Problem

Timo Walther
Hi Yongsong,

in newer Flink versions we introduced the concept of statament sets,
which are available via `TableEnvironment.createStatementSet()`. They
allow you to opimized a branching pipeline as a whole with reusing subplans.

In older Flink versions, you can convert the Table to a DataStream and
reregister it as a Table. In this case, the subplan will be materialized
into a DataStream pipeline and the planner sees it as a blackbox that
will be shared by multiple branches.

I hope this helps.

Regards,
Timo


On 08.02.21 03:59, Yongsong He wrote:

> Hi experts,
> I want to cache a temporary table for reuse it
>
> Flink version 1.10.1
>
> the table is consumer from kafka,  struct like:
> create table a (
> field1 string,
> field2 string,
> field3 string,
> field4 string
> )
>
> the sample code looks like:
>
> val settings =
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val temptable = tableEnv.sqlQuery(s"select * from a where ${condition}")
>
> temptable.where(condition1) ... then do something
> temptable.where(condition2) ... then do otherthing
>
>
> I want to reuse temptable for higher performance, what operators need or
> it already cached in flink sql plan ?
>
> Any help would be appreciated :)
>

Reply | Threaded
Open this post in threaded view
|

Re: Table Cache Problem

Yongsong He
thanks for your help,Timo,it is very helpful

在 2021年2月8日星期一,Timo Walther <[hidden email]> 写道:
Hi Yongsong,

in newer Flink versions we introduced the concept of statament sets, which are available via `TableEnvironment.createStatementSet()`. They allow you to opimized a branching pipeline as a whole with reusing subplans.

In older Flink versions, you can convert the Table to a DataStream and reregister it as a Table. In this case, the subplan will be materialized into a DataStream pipeline and the planner sees it as a blackbox that will be shared by multiple branches.

I hope this helps.

Regards,
Timo


On 08.02.21 03:59, Yongsong He wrote:
Hi experts,
I want to cache a temporary table for reuse it

Flink version 1.10.1

the table is consumer from kafka,  struct like:
create table a (
field1 string,
field2 string,
field3 string,
field4 string
)

the sample code looks like:

val settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv = StreamTableEnvironment.create(env, settings)

val temptable = tableEnv.sqlQuery(s"select * from a where ${condition}")

temptable.where(condition1) ... then do something
temptable.where(condition2) ... then do otherthing


I want to reuse temptable for higher performance, what operators need or it already cached in flink sql plan ?

Any help would be appreciated :)