Hi all,
I noticed that the DataStreamRel#translateToPlan
is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan
(in TableEnvirnment#explain
, TableEnvirnment#writeToSink
, etc), we add same operators in execution environment repeatedly.
Should we eliminate the side effect of DataStreamRel#translateToPlan
?
Best, Wangsan
appendix
tenv.registerTableSource("test_source", sourceTable)
val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))
implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Data Source
content : collect elements with CollectionInputFormat
Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 9 : Operator
content : Map
ship_strategy : FORWARD
Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD
Stage 11 : Data Source
content : collect elements with CollectionInputFormat
Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 13 : Operator
content : Map
ship_strategy : FORWARD