Side effect of DataStreamRel#translateToPlan

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Side effect of DataStreamRel#translateToPlan

wangsan

Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explainTableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly. 

Should we eliminate the side effect of DataStreamRel#translateToPlan ? 

Best,  Wangsan

appendix

    tenv.registerTableSource("test_source", sourceTable)

    val t = tenv.sqlQuery("SELECT * from test_source")
    println(tenv.explain(t))
    println(tenv.explain(t))

    implicit val typeInfo = TypeInformation.of(classOf[Row])
    tenv.toAppendStream(t)
    println(tenv.explain(t))

We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 4 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 5 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 6 : Operator
            content : Map
            ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 4 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 5 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 6 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 7 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 8 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 9 : Operator
            content : Map
            ship_strategy : FORWARD

            Stage 10 : Operator
                content : to: Row
                ship_strategy : FORWARD

Stage 11 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 12 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 13 : Operator
            content : Map
            ship_strategy : FORWARD