Nondeterministic results with SQL job when parallelism is > 1

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Nondeterministic results with SQL job when parallelism is > 1

Dylan Forciea

I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently.

 

I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug?

 

Regards,

Dylan Forciea

 

 

object Job {

  def main(args: Array[String]): Unit = {

    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

 

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    val configuration = streamTableEnv.getConfig().getConfiguration()

    configuration.setInteger("table.exec.resource.default-parallelism", 16)

 

    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id1 STRING PRIMARY KEY NOT ENFORCED,

        attr STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table1’,

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr STRING,

        id2 STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table2',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table3 (

        attr STRING PRIMARY KEY NOT ENFORCED,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = ‘table3',

        'username' = ‘username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql("""

      CREATE TABLE sink (

        id STRING PRIMARY KEY NOT ENFORCED,

        attr STRING,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…,

        'table-name' = 'sink',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    val view =

      streamTableEnv.sqlQuery("""

      SELECT

        COALESCE(t1.id1, t2.id2) AS id,

        COALESCE(t2.attr, t1.attr) AS operator,

        COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped

      FROM table1 t1

      FULL JOIN (

        SELECT

          id2,

          FIRST_VALUE(attr) AS attr

        FROM table2

        GROUP BY id2

      ) t2

       ON (t1.id1 = t2.id2)

      LEFT JOIN table3 t3

        ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")

    streamTableEnv.createTemporaryView("view", view)

 

    val statementSet = streamTableEnv.createStatementSet()

    statementSet.addInsertSql("""

      INSERT INTO sink SELECT * FROM view

    """)

 

    statementSet.execute().await()

  }

}

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Piotr Nowojski-4
Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query:

       SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]> napisał(a):

I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently.

 

I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug?

 

Regards,

Dylan Forciea

 

 

object Job {

  def main(args: Array[String]): Unit = {

    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

 

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    val configuration = streamTableEnv.getConfig().getConfiguration()

    configuration.setInteger("table.exec.resource.default-parallelism", 16)

 

    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id1 STRING PRIMARY KEY NOT ENFORCED,

        attr STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table1’,

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr STRING,

        id2 STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table2',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table3 (

        attr STRING PRIMARY KEY NOT ENFORCED,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = ‘table3',

        'username' = ‘username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql("""

      CREATE TABLE sink (

        id STRING PRIMARY KEY NOT ENFORCED,

        attr STRING,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…,

        'table-name' = 'sink',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    val view =

      streamTableEnv.sqlQuery("""

      SELECT

        COALESCE(t1.id1, t2.id2) AS id,

        COALESCE(t2.attr, t1.attr) AS operator,

        COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped

      FROM table1 t1

      FULL JOIN (

        SELECT

          id2,

          FIRST_VALUE(attr) AS attr

        FROM table2

        GROUP BY id2

      ) t2

       ON (t1.id1 = t2.id2)

      LEFT JOIN table3 t3

        ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")

    streamTableEnv.createTemporaryView("view", view)

 

    val statementSet = streamTableEnv.createStatementSet()

    statementSet.addInsertSql("""

      INSERT INTO sink SELECT * FROM view

    """)

 

    statementSet.execute().await()

  }

}

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Dylan Forciea

Pitorek,

 

I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think.

 

Dylan

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

Hi,

 

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query:

 

       SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2

 

Best,

Piotrek

 

śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]> napisał(a):

I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently.

 

I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug?

 

Regards,

Dylan Forciea

 

 

object Job {

  def main(args: Array[String]): Unit = {

    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

 

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    val configuration = streamTableEnv.getConfig().getConfiguration()

    configuration.setInteger("table.exec.resource.default-parallelism", 16)

 

    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id1 STRING PRIMARY KEY NOT ENFORCED,

        attr STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table1’,

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr STRING,

        id2 STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table2',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table3 (

        attr STRING PRIMARY KEY NOT ENFORCED,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = ‘table3',

        'username' = ‘username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql("""

      CREATE TABLE sink (

        id STRING PRIMARY KEY NOT ENFORCED,

        attr STRING,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…,

        'table-name' = 'sink',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    val view =

      streamTableEnv.sqlQuery("""

      SELECT

        COALESCE(t1.id1, t2.id2) AS id,

        COALESCE(t2.attr, t1.attr) AS operator,

        COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped

      FROM table1 t1

      FULL JOIN (

        SELECT

          id2,

          FIRST_VALUE(attr) AS attr

        FROM table2

        GROUP BY id2

      ) t2

       ON (t1.id1 = t2.id2)

      LEFT JOIN table3 t3

        ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")

    streamTableEnv.createTemporaryView("view", view)

 

    val statementSet = streamTableEnv.createStatementSet()

    statementSet.addInsertSql("""

      INSERT INTO sink SELECT * FROM view

    """)

 

    statementSet.execute().await()

  }

}

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Timo Walther
Hi Dylan,

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

is currently not supported by the Table & SQL API. For now,

val settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

determines the mode. Thus, I would remove the line again.

If you want to use `inBatchMode()`, you can use the unified
TableEnvironment that is not connected to the StreamExecutionEnvironment:

TableEnvironment.create(settings);

So Pitorek's answer hopefully makes more sense now.

Regards,
Timo


On 14.04.21 16:08, Dylan Forciea wrote:

> Pitorek,
>
> I was actually originally using a group function that WAS deterministic
> (but was a custom UDF I made), but chose something here built in. By
> non-deterministic, I mean that the number of records coming out is not
> consistent. Since the FIRST_VALUE here is on an attribute that is not
> part of the key, that shouldn’t affect the number of records coming out
> I wouldn’t think.
>
> Dylan
>
> *From: *Piotr Nowojski <[hidden email]>
> *Date: *Wednesday, April 14, 2021 at 9:06 AM
> *To: *Dylan Forciea <[hidden email]>
> *Cc: *"[hidden email]" <[hidden email]>
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1
>
> Hi,
>
> Yes, it looks like your query is non deterministic because of
> `FIRST_VALUE` used inside `GROUP BY`. If you have many different
> parallel sources, each time you run your query your first value might be
> different. If that's the case, you could try to confirm it with even
> smaller query:
>
>         SELECT
>            id2,
>            FIRST_VALUE(attr) AS attr
>          FROM table2
>          GROUP BY id2
>
> Best,
>
> Piotrek
>
> śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]
> <mailto:[hidden email]>> napisał(a):
>
>     I am running Flink 1.12.2, and I was trying to up the parallelism of
>     my Flink SQL job to see what happened. However, once I did that, my
>     results became nondeterministic. This happens whether I set the
>     table.exec.resource.default-parallelism config option or I set the
>     default local parallelism to something higher than 1. I would end up
>     with less records in the end, and each time I ran the output record
>     count would come out differently.
>
>     I managed to distill an example, as pasted below (with attribute
>     names changed to protect company proprietary info), that causes the
>     issue. I feel like I managed to get it to happen with a LEFT JOIN
>     rather than a FULL JOIN, but the distilled version wasn’t giving me
>     wrong results with that. Maybe it has to do with joining to a table
>     that was formed using a GROUP BY? Can somebody tell if I’m doing
>     something that is known not to work, or if I have run across a bug?
>
>     Regards,
>
>     Dylan Forciea
>
>     objectJob{
>
>     defmain(args: Array[String]): Unit= {
>
>     StreamExecutionEnvironment.setDefaultLocalParallelism(1)
>
>     valsettings=
>     EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>
>     valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
>
>     valstreamTableEnv= StreamTableEnvironment.create(streamEnv, settings)
>
>     valconfiguration= streamTableEnv.getConfig().getConfiguration()
>
>        
>     configuration.setInteger("table.exec.resource.default-parallelism", 16)
>
>          streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>          streamTableEnv.executeSql(
>
>     """
>
>            CREATE TABLE table1 (
>
>              id1 STRING PRIMARY KEY NOT ENFORCED,
>
>              attr STRING
>
>            ) WITH (
>
>              'connector' = 'jdbc',
>
>              'url' = 'jdbc:postgresql://…',
>
>              'table-name' = 'table1’,
>
>              'username' = 'username',
>
>              'password' = 'password',
>
>              'scan.fetch-size' = '500',
>
>              'scan.auto-commit' = 'false'
>
>            )""")
>
>          streamTableEnv.executeSql(
>
>     """
>
>            CREATE TABLE table2 (
>
>              attr STRING,
>
>              id2 STRING
>
>            ) WITH (
>
>              'connector' = 'jdbc',
>
>              'url' = 'jdbc:postgresql://…',
>
>              'table-name' = 'table2',
>
>              'username' = 'username',
>
>              'password' = 'password',
>
>              'scan.fetch-size' = '500',
>
>              'scan.auto-commit' = 'false'
>
>            )""")
>
>          streamTableEnv.executeSql(
>
>     """
>
>            CREATE TABLE table3 (
>
>              attr STRING PRIMARY KEY NOT ENFORCED,
>
>              attr_mapped STRING
>
>            ) WITH (
>
>              'connector' = 'jdbc',
>
>              'url' = 'jdbc:postgresql://…',
>
>              'table-name' = ‘table3',
>
>              'username' = ‘username',
>
>              'password' = 'password',
>
>              'scan.fetch-size' = '500',
>
>              'scan.auto-commit' = 'false'
>
>            )""")
>
>          streamTableEnv.executeSql("""
>
>            CREATE TABLE sink (
>
>              id STRING PRIMARY KEY NOT ENFORCED,
>
>              attr STRING,
>
>              attr_mapped STRING
>
>            ) WITH (
>
>              'connector' = 'jdbc',
>
>              'url' = 'jdbc:postgresql://…,
>
>              'table-name' = 'sink',
>
>              'username' = 'username',
>
>              'password' = 'password',
>
>              'scan.fetch-size' = '500',
>
>              'scan.auto-commit' = 'false'
>
>            )""")
>
>     valview=
>
>            streamTableEnv.sqlQuery("""
>
>            SELECT
>
>              COALESCE(t1.id1, t2.id2) AS id,
>
>              COALESCE(t2.attr, t1.attr) AS operator,
>
>              COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
>
>            FROM table1 t1
>
>            FULL JOIN (
>
>              SELECT
>
>                id2,
>
>                FIRST_VALUE(attr) AS attr
>
>              FROM table2
>
>              GROUP BY id2
>
>            ) t2
>
>             ON (t1.id1 = t2.id2)
>
>            LEFT JOIN table3 t3
>
>              ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
>
>          streamTableEnv.createTemporaryView("view", view)
>
>     valstatementSet= streamTableEnv.createStatementSet()
>
>          statementSet.addInsertSql("""
>
>            INSERT INTO sink SELECT * FROM view
>
>          """)
>
>          statementSet.execute().await()
>
>        }
>
>     }
>

Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Dylan Forciea
In reply to this post by Dylan Forciea

I replaced the FIRST_VALUE with MAX to ensure that the results should be identical even in their content, and my problem still remains – I end up with a nondeterministic count of records being emitted into the sink when the parallelism is over 1, and that count is about 20-25% short (and not consistent) of what comes out consistently when parallelism is set to 1.

 

Dylan

 

From: Dylan Forciea <[hidden email]>
Date: Wednesday, April 14, 2021 at 9:08 AM
To: Piotr Nowojski <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

Pitorek,

 

I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think.

 

Dylan

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

Hi,

 

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query:

 

       SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2

 

Best,

Piotrek

 

śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]> napisał(a):

I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently.

 

I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug?

 

Regards,

Dylan Forciea

 

 

object Job {

  def main(args: Array[String]): Unit = {

    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

 

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    val configuration = streamTableEnv.getConfig().getConfiguration()

    configuration.setInteger("table.exec.resource.default-parallelism", 16)

 

    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id1 STRING PRIMARY KEY NOT ENFORCED,

        attr STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table1’,

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr STRING,

        id2 STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table2',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table3 (

        attr STRING PRIMARY KEY NOT ENFORCED,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = ‘table3',

        'username' = ‘username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql("""

      CREATE TABLE sink (

        id STRING PRIMARY KEY NOT ENFORCED,

        attr STRING,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…,

        'table-name' = 'sink',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    val view =

      streamTableEnv.sqlQuery("""

      SELECT

        COALESCE(t1.id1, t2.id2) AS id,

        COALESCE(t2.attr, t1.attr) AS operator,

        COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped

      FROM table1 t1

      FULL JOIN (

        SELECT

          id2,

          FIRST_VALUE(attr) AS attr

        FROM table2

        GROUP BY id2

      ) t2

       ON (t1.id1 = t2.id2)

      LEFT JOIN table3 t3

        ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")

    streamTableEnv.createTemporaryView("view", view)

 

    val statementSet = streamTableEnv.createStatementSet()

    statementSet.addInsertSql("""

      INSERT INTO sink SELECT * FROM view

    """)

 

    statementSet.execute().await()

  }

}

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Piotr Nowojski-4
Hi Dylan,

But if you are running your query in Streaming mode, aren't you counting retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when the first record comes in it will be immediately emitted with NULLs (not matched, as the other table is empty). Later if a matching record is received from the second table, the previous result will be retracted and the new one, updated, will be re-emitted. Maybe this is what you are observing in the varying output? 

Maybe you could try to analyse how the results differ between different runs?

Best,
Piotrek

śr., 14 kwi 2021 o 16:22 Dylan Forciea <[hidden email]> napisał(a):

I replaced the FIRST_VALUE with MAX to ensure that the results should be identical even in their content, and my problem still remains – I end up with a nondeterministic count of records being emitted into the sink when the parallelism is over 1, and that count is about 20-25% short (and not consistent) of what comes out consistently when parallelism is set to 1.

 

Dylan

 

From: Dylan Forciea <[hidden email]>
Date: Wednesday, April 14, 2021 at 9:08 AM
To: Piotr Nowojski <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

Pitorek,

 

I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think.

 

Dylan

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

Hi,

 

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query:

 

       SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2

 

Best,

Piotrek

 

śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]> napisał(a):

I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently.

 

I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug?

 

Regards,

Dylan Forciea

 

 

object Job {

  def main(args: Array[String]): Unit = {

    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

 

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    val configuration = streamTableEnv.getConfig().getConfiguration()

    configuration.setInteger("table.exec.resource.default-parallelism", 16)

 

    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id1 STRING PRIMARY KEY NOT ENFORCED,

        attr STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table1’,

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr STRING,

        id2 STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table2',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table3 (

        attr STRING PRIMARY KEY NOT ENFORCED,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = ‘table3',

        'username' = ‘username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql("""

      CREATE TABLE sink (

        id STRING PRIMARY KEY NOT ENFORCED,

        attr STRING,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…,

        'table-name' = 'sink',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    val view =

      streamTableEnv.sqlQuery("""

      SELECT

        COALESCE(t1.id1, t2.id2) AS id,

        COALESCE(t2.attr, t1.attr) AS operator,

        COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped

      FROM table1 t1

      FULL JOIN (

        SELECT

          id2,

          FIRST_VALUE(attr) AS attr

        FROM table2

        GROUP BY id2

      ) t2

       ON (t1.id1 = t2.id2)

      LEFT JOIN table3 t3

        ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")

    streamTableEnv.createTemporaryView("view", view)

 

    val statementSet = streamTableEnv.createStatementSet()

    statementSet.addInsertSql("""

      INSERT INTO sink SELECT * FROM view

    """)

 

    statementSet.execute().await()

  }

}

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Dylan Forciea

Piotrek,

 

I am looking at the count of records present in the sink table in Postgres after the entire job completes, not the number of inserts/retracts. I can see as the job runs that records are added and removed from the “sink” table. With parallelism set to 1, it always comes out to the same number (which is consistent with the number of ids in the source tables “table1” and “table2”), at about 491k records in table “sink” when the job is complete. With the parallelism set to 16, the “sink” table will have somewhere around 360k records +/- 20k when the job is complete. I truncate the “sink” table before I run the job, and this is a test environment where the source databases are static.

 

I removed my line for setting to Batch mode per Timo’s suggestion, and am still running with MAX which should have deterministic output.

 

Dylan

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, April 14, 2021 at 9:38 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

Hi Dylan,

 

But if you are running your query in Streaming mode, aren't you counting retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when the first record comes in it will be immediately emitted with NULLs (not matched, as the other table is empty). Later if a matching record is received from the second table, the previous result will be retracted and the new one, updated, will be re-emitted. Maybe this is what you are observing in the varying output? 

 

Maybe you could try to analyse how the results differ between different runs?

 

Best,

Piotrek

 

śr., 14 kwi 2021 o 16:22 Dylan Forciea <[hidden email]> napisał(a):

I replaced the FIRST_VALUE with MAX to ensure that the results should be identical even in their content, and my problem still remains – I end up with a nondeterministic count of records being emitted into the sink when the parallelism is over 1, and that count is about 20-25% short (and not consistent) of what comes out consistently when parallelism is set to 1.

 

Dylan

 

From: Dylan Forciea <[hidden email]>
Date: Wednesday, April 14, 2021 at 9:08 AM
To: Piotr Nowojski <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

Pitorek,

 

I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think.

 

Dylan

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

Hi,

 

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query:

 

       SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2

 

Best,

Piotrek

 

śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]> napisał(a):

I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently.

 

I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug?

 

Regards,

Dylan Forciea

 

 

object Job {

  def main(args: Array[String]): Unit = {

    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

 

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    val configuration = streamTableEnv.getConfig().getConfiguration()

    configuration.setInteger("table.exec.resource.default-parallelism", 16)

 

    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id1 STRING PRIMARY KEY NOT ENFORCED,

        attr STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table1’,

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr STRING,

        id2 STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = 'table2',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table3 (

        attr STRING PRIMARY KEY NOT ENFORCED,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…',

        'table-name' = ‘table3',

        'username' = ‘username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    streamTableEnv.executeSql("""

      CREATE TABLE sink (

        id STRING PRIMARY KEY NOT ENFORCED,

        attr STRING,

        attr_mapped STRING

      ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://…,

        'table-name' = 'sink',

        'username' = 'username',

        'password' = 'password',

        'scan.fetch-size' = '500',

        'scan.auto-commit' = 'false'

      )""")

 

    val view =

      streamTableEnv.sqlQuery("""

      SELECT

        COALESCE(t1.id1, t2.id2) AS id,

        COALESCE(t2.attr, t1.attr) AS operator,

        COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped

      FROM table1 t1

      FULL JOIN (

        SELECT

          id2,

          FIRST_VALUE(attr) AS attr

        FROM table2

        GROUP BY id2

      ) t2

       ON (t1.id1 = t2.id2)

      LEFT JOIN table3 t3

        ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")

    streamTableEnv.createTemporaryView("view", view)

 

    val statementSet = streamTableEnv.createStatementSet()

    statementSet.addInsertSql("""

      INSERT INTO sink SELECT * FROM view

    """)

 

    statementSet.execute().await()

  }

}

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Dylan Forciea
Timo,

Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it)

Dylan

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
+- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, $1)])
   +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
      :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS NOT NULL($3), $3, $1)])
      :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
      :     :- LogicalTableScan(table=[[default_catalog, default_database, table1]])
      :     +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
      :        +- LogicalProject(id2=[$1], attr=[$0])
      :           +- LogicalTableScan(table=[[default_catalog, default_database, table2]])
      +- LogicalTableScan(table=[[default_catalog, default_database, table3]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped], changelogMode=[NONE])
+- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
   +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
      :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
      :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
      :     +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
      :        :- Exchange(distribution=[hash[id1]], changelogMode=[I])
      :        :  +- TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr], changelogMode=[I])
      :        +- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA])
      :           +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr], changelogMode=[I,UB,UA])
      :              +- Exchange(distribution=[hash[id2]], changelogMode=[I])
      :                 +- TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2], changelogMode=[I])
      +- Exchange(distribution=[hash[attr]], changelogMode=[I])
         +- TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped], changelogMode=[I])

== Physical Execution Plan ==
Stage 1 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr])

Stage 3 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2])

        Stage 5 : Attr
                content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr])
                ship_strategy : HASH

                Stage 7 : Attr
                        content : Join(joinType=[FullOuterJoin], where=[(id1 = id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                        ship_strategy : HASH

                        Stage 8 : Attr
                                content : Calc(select=[id1, attr, id2, attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
                                ship_strategy : FORWARD

Stage 10 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped])

        Stage 12 : Attr
                content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                ship_strategy : HASH

                Stage 13 : Attr
                        content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS attr_mapped])
                        ship_strategy : FORWARD

                        Stage 14 : Data Sink
                                content : Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
                                ship_strategy : FORWARD

On 4/14/21, 10:08 AM, "Timo Walther" <[hidden email]> wrote:

    Can you share the resulting plan with us? Ideally with the ChangelogMode
    detail enabled as well.

    statementSet.explain(...)

    Maybe this could help.

    Regards,
    Timo



    On 14.04.21 16:47, Dylan Forciea wrote:
    > Piotrek,
    >
    > I am looking at the count of records present in the sink table in
    > Postgres after the entire job completes, not the number of
    > inserts/retracts. I can see as the job runs that records are added and
    > removed from the “sink” table. With parallelism set to 1, it always
    > comes out to the same number (which is consistent with the number of ids
    > in the source tables “table1” and “table2”), at about 491k records in
    > table “sink” when the job is complete. With the parallelism set to 16,
    > the “sink” table will have somewhere around 360k records +/- 20k when
    > the job is complete. I truncate the “sink” table before I run the job,
    > and this is a test environment where the source databases are static.
    >
    > I removed my line for setting to Batch mode per Timo’s suggestion, and
    > am still running with MAX which should have deterministic output.
    >
    > Dylan
    >
    > *From: *Piotr Nowojski <[hidden email]>
    > *Date: *Wednesday, April 14, 2021 at 9:38 AM
    > *To: *Dylan Forciea <[hidden email]>
    > *Cc: *"[hidden email]" <[hidden email]>
    > *Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1
    >
    > Hi Dylan,
    >
    > But if you are running your query in Streaming mode, aren't you counting
    > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN,
    > when the first record comes in it will be immediately emitted with NULLs
    > (not matched, as the other table is empty). Later if a matching record
    > is received from the second table, the previous result will be retracted
    > and the new one, updated, will be re-emitted. Maybe this is what you are
    > observing in the varying output?
    >
    > Maybe you could try to analyse how the results differ between different
    > runs?
    >
    > Best,
    >
    > Piotrek
    >
    > śr., 14 kwi 2021 o 16:22 Dylan Forciea <[hidden email]
    > <mailto:[hidden email]>> napisał(a):
    >
    >     I replaced the FIRST_VALUE with MAX to ensure that the results
    >     should be identical even in their content, and my problem still
    >     remains – I end up with a nondeterministic count of records being
    >     emitted into the sink when the parallelism is over 1, and that count
    >     is about 20-25% short (and not consistent) of what comes out
    >     consistently when parallelism is set to 1.
    >
    >     Dylan
    >
    >     *From: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
    >     *Date: *Wednesday, April 14, 2021 at 9:08 AM
    >     *To: *Piotr Nowojski <[hidden email]
    >     <mailto:[hidden email]>>
    >     *Cc: *"[hidden email] <mailto:[hidden email]>"
    >     <[hidden email] <mailto:[hidden email]>>
    >     *Subject: *Re: Nondeterministic results with SQL job when
    >     parallelism is > 1
    >
    >     Pitorek,
    >
    >     I was actually originally using a group function that WAS
    >     deterministic (but was a custom UDF I made), but chose something
    >     here built in. By non-deterministic, I mean that the number of
    >     records coming out is not consistent. Since the FIRST_VALUE here is
    >     on an attribute that is not part of the key, that shouldn’t affect
    >     the number of records coming out I wouldn’t think.
    >
    >     Dylan
    >
    >     *From: *Piotr Nowojski <[hidden email]
    >     <mailto:[hidden email]>>
    >     *Date: *Wednesday, April 14, 2021 at 9:06 AM
    >     *To: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
    >     *Cc: *"[hidden email] <mailto:[hidden email]>"
    >     <[hidden email] <mailto:[hidden email]>>
    >     *Subject: *Re: Nondeterministic results with SQL job when
    >     parallelism is > 1
    >
    >     Hi,
    >
    >     Yes, it looks like your query is non deterministic because of
    >     `FIRST_VALUE` used inside `GROUP BY`. If you have many different
    >     parallel sources, each time you run your query your first value
    >     might be different. If that's the case, you could try to confirm it
    >     with even smaller query:
    >
    >             SELECT
    >                id2,
    >                FIRST_VALUE(attr) AS attr
    >              FROM table2
    >              GROUP BY id2
    >
    >     Best,
    >
    >     Piotrek
    >
    >     śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]
    >     <mailto:[hidden email]>> napisał(a):
    >
    >         I am running Flink 1.12.2, and I was trying to up the
    >         parallelism of my Flink SQL job to see what happened. However,
    >         once I did that, my results became nondeterministic. This
    >         happens whether I set the
    >         table.exec.resource.default-parallelism config option or I set
    >         the default local parallelism to something higher than 1. I
    >         would end up with less records in the end, and each time I ran
    >         the output record count would come out differently.
    >
    >         I managed to distill an example, as pasted below (with attribute
    >         names changed to protect company proprietary info), that causes
    >         the issue. I feel like I managed to get it to happen with a LEFT
    >         JOIN rather than a FULL JOIN, but the distilled version wasn’t
    >         giving me wrong results with that. Maybe it has to do with
    >         joining to a table that was formed using a GROUP BY? Can
    >         somebody tell if I’m doing something that is known not to work,
    >         or if I have run across a bug?
    >
    >         Regards,
    >
    >         Dylan Forciea
    >
    >         objectJob{
    >
    >         defmain(args: Array[String]): Unit= {
    >
    >         StreamExecutionEnvironment.setDefaultLocalParallelism(1)
    >
    >         valsettings=
    >         EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    >
    >         valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
    >
    >         valstreamTableEnv= StreamTableEnvironment.create(streamEnv,
    >         settings)
    >
    >         valconfiguration= streamTableEnv.getConfig().getConfiguration()
    >
    >            
    >         configuration.setInteger("table.exec.resource.default-parallelism",
    >         16)
    >
    >              streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
    >
    >              streamTableEnv.executeSql(
    >
    >         """
    >
    >                CREATE TABLE table1 (
    >
    >                  id1 STRING PRIMARY KEY NOT ENFORCED,
    >
    >                  attr STRING
    >
    >                ) WITH (
    >
    >                  'connector' = 'jdbc',
    >
    >                  'url' = 'jdbc:postgresql://…',
    >
    >                  'table-name' = 'table1’,
    >
    >                  'username' = 'username',
    >
    >                  'password' = 'password',
    >
    >                  'scan.fetch-size' = '500',
    >
    >                  'scan.auto-commit' = 'false'
    >
    >                )""")
    >
    >              streamTableEnv.executeSql(
    >
    >         """
    >
    >                CREATE TABLE table2 (
    >
    >                  attr STRING,
    >
    >                  id2 STRING
    >
    >                ) WITH (
    >
    >                  'connector' = 'jdbc',
    >
    >                  'url' = 'jdbc:postgresql://…',
    >
    >                  'table-name' = 'table2',
    >
    >                  'username' = 'username',
    >
    >                  'password' = 'password',
    >
    >                  'scan.fetch-size' = '500',
    >
    >                  'scan.auto-commit' = 'false'
    >
    >                )""")
    >
    >              streamTableEnv.executeSql(
    >
    >         """
    >
    >                CREATE TABLE table3 (
    >
    >                  attr STRING PRIMARY KEY NOT ENFORCED,
    >
    >                  attr_mapped STRING
    >
    >                ) WITH (
    >
    >                  'connector' = 'jdbc',
    >
    >                  'url' = 'jdbc:postgresql://…',
    >
    >                  'table-name' = ‘table3',
    >
    >                  'username' = ‘username',
    >
    >                  'password' = 'password',
    >
    >                  'scan.fetch-size' = '500',
    >
    >                  'scan.auto-commit' = 'false'
    >
    >                )""")
    >
    >              streamTableEnv.executeSql("""
    >
    >                CREATE TABLE sink (
    >
    >                  id STRING PRIMARY KEY NOT ENFORCED,
    >
    >                  attr STRING,
    >
    >                  attr_mapped STRING
    >
    >                ) WITH (
    >
    >                  'connector' = 'jdbc',
    >
    >                  'url' = 'jdbc:postgresql://…,
    >
    >                  'table-name' = 'sink',
    >
    >                  'username' = 'username',
    >
    >                  'password' = 'password',
    >
    >                  'scan.fetch-size' = '500',
    >
    >                  'scan.auto-commit' = 'false'
    >
    >                )""")
    >
    >         valview=
    >
    >                streamTableEnv.sqlQuery("""
    >
    >                SELECT
    >
    >                  COALESCE(t1.id1, t2.id2) AS id,
    >
    >                  COALESCE(t2.attr, t1.attr) AS attr,
    >
    >                  COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
    >
    >                FROM table1 t1
    >
    >                FULL JOIN (
    >
    >                  SELECT
    >
    >                    id2,
    >
    >                    FIRST_VALUE(attr) AS attr
    >
    >                  FROM table2
    >
    >                  GROUP BY id2
    >
    >                ) t2
    >
    >                 ON (t1.id1 = t2.id2)
    >
    >                LEFT JOIN table3 t3
    >
    >                  ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
    >
    >              streamTableEnv.createTemporaryView("view", view)
    >
    >         valstatementSet= streamTableEnv.createStatementSet()
    >
    >              statementSet.addInsertSql("""
    >
    >                INSERT INTO sink SELECT * FROM view
    >
    >              """)
    >
    >              statementSet.execute().await()
    >
    >            }
    >
    >         }
    >


Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Dylan Forciea
On a side note - I changed to use the batch mode per your suggestion Timo, and my job ran much faster and with deterministic counts with parallelism turned on. So I'll probably utilize that for now. However, it would still be nice to dig down into why streaming isn't working in case I need that in the future.

Dylan

On 4/14/21, 10:27 AM, "Dylan Forciea" <[hidden email]> wrote:

    Timo,

    Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it)

    Dylan

    == Abstract Syntax Tree ==
    LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
    +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, $1)])
       +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
          :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS NOT NULL($3), $3, $1)])
          :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
          :     :- LogicalTableScan(table=[[default_catalog, default_database, table1]])
          :     +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
          :        +- LogicalProject(id2=[$1], attr=[$0])
          :           +- LogicalTableScan(table=[[default_catalog, default_database, table2]])
          +- LogicalTableScan(table=[[default_catalog, default_database, table3]])

    == Optimized Logical Plan ==
    Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped], changelogMode=[NONE])
    +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
       +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
          :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
          :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
          :     +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
          :        :- Exchange(distribution=[hash[id1]], changelogMode=[I])
          :        :  +- TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr], changelogMode=[I])
          :        +- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA])
          :           +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr], changelogMode=[I,UB,UA])
          :              +- Exchange(distribution=[hash[id2]], changelogMode=[I])
          :                 +- TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2], changelogMode=[I])
          +- Exchange(distribution=[hash[attr]], changelogMode=[I])
             +- TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped], changelogMode=[I])

    == Physical Execution Plan ==
    Stage 1 : Data Source
    content : Source: TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr])

    Stage 3 : Data Source
    content : Source: TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2])

    Stage 5 : Attr
    content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr])
    ship_strategy : HASH

    Stage 7 : Attr
    content : Join(joinType=[FullOuterJoin], where=[(id1 = id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    ship_strategy : HASH

    Stage 8 : Attr
    content : Calc(select=[id1, attr, id2, attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
    ship_strategy : FORWARD

    Stage 10 : Data Source
    content : Source: TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped])

    Stage 12 : Attr
    content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    ship_strategy : HASH

    Stage 13 : Attr
    content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS attr_mapped])
    ship_strategy : FORWARD

    Stage 14 : Data Sink
    content : Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
    ship_strategy : FORWARD

    On 4/14/21, 10:08 AM, "Timo Walther" <[hidden email]> wrote:

        Can you share the resulting plan with us? Ideally with the ChangelogMode
        detail enabled as well.

        statementSet.explain(...)

        Maybe this could help.

        Regards,
        Timo



        On 14.04.21 16:47, Dylan Forciea wrote:
        > Piotrek,
        >
        > I am looking at the count of records present in the sink table in
        > Postgres after the entire job completes, not the number of
        > inserts/retracts. I can see as the job runs that records are added and
        > removed from the “sink” table. With parallelism set to 1, it always
        > comes out to the same number (which is consistent with the number of ids
        > in the source tables “table1” and “table2”), at about 491k records in
        > table “sink” when the job is complete. With the parallelism set to 16,
        > the “sink” table will have somewhere around 360k records +/- 20k when
        > the job is complete. I truncate the “sink” table before I run the job,
        > and this is a test environment where the source databases are static.
        >
        > I removed my line for setting to Batch mode per Timo’s suggestion, and
        > am still running with MAX which should have deterministic output.
        >
        > Dylan
        >
        > *From: *Piotr Nowojski <[hidden email]>
        > *Date: *Wednesday, April 14, 2021 at 9:38 AM
        > *To: *Dylan Forciea <[hidden email]>
        > *Cc: *"[hidden email]" <[hidden email]>
        > *Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1
        >
        > Hi Dylan,
        >
        > But if you are running your query in Streaming mode, aren't you counting
        > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN,
        > when the first record comes in it will be immediately emitted with NULLs
        > (not matched, as the other table is empty). Later if a matching record
        > is received from the second table, the previous result will be retracted
        > and the new one, updated, will be re-emitted. Maybe this is what you are
        > observing in the varying output?
        >
        > Maybe you could try to analyse how the results differ between different
        > runs?
        >
        > Best,
        >
        > Piotrek
        >
        > śr., 14 kwi 2021 o 16:22 Dylan Forciea <[hidden email]
        > <mailto:[hidden email]>> napisał(a):
        >
        >     I replaced the FIRST_VALUE with MAX to ensure that the results
        >     should be identical even in their content, and my problem still
        >     remains – I end up with a nondeterministic count of records being
        >     emitted into the sink when the parallelism is over 1, and that count
        >     is about 20-25% short (and not consistent) of what comes out
        >     consistently when parallelism is set to 1.
        >
        >     Dylan
        >
        >     *From: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
        >     *Date: *Wednesday, April 14, 2021 at 9:08 AM
        >     *To: *Piotr Nowojski <[hidden email]
        >     <mailto:[hidden email]>>
        >     *Cc: *"[hidden email] <mailto:[hidden email]>"
        >     <[hidden email] <mailto:[hidden email]>>
        >     *Subject: *Re: Nondeterministic results with SQL job when
        >     parallelism is > 1
        >
        >     Pitorek,
        >
        >     I was actually originally using a group function that WAS
        >     deterministic (but was a custom UDF I made), but chose something
        >     here built in. By non-deterministic, I mean that the number of
        >     records coming out is not consistent. Since the FIRST_VALUE here is
        >     on an attribute that is not part of the key, that shouldn’t affect
        >     the number of records coming out I wouldn’t think.
        >
        >     Dylan
        >
        >     *From: *Piotr Nowojski <[hidden email]
        >     <mailto:[hidden email]>>
        >     *Date: *Wednesday, April 14, 2021 at 9:06 AM
        >     *To: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
        >     *Cc: *"[hidden email] <mailto:[hidden email]>"
        >     <[hidden email] <mailto:[hidden email]>>
        >     *Subject: *Re: Nondeterministic results with SQL job when
        >     parallelism is > 1
        >
        >     Hi,
        >
        >     Yes, it looks like your query is non deterministic because of
        >     `FIRST_VALUE` used inside `GROUP BY`. If you have many different
        >     parallel sources, each time you run your query your first value
        >     might be different. If that's the case, you could try to confirm it
        >     with even smaller query:
        >
        >             SELECT
        >                id2,
        >                FIRST_VALUE(attr) AS attr
        >              FROM table2
        >              GROUP BY id2
        >
        >     Best,
        >
        >     Piotrek
        >
        >     śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]
        >     <mailto:[hidden email]>> napisał(a):
        >
        >         I am running Flink 1.12.2, and I was trying to up the
        >         parallelism of my Flink SQL job to see what happened. However,
        >         once I did that, my results became nondeterministic. This
        >         happens whether I set the
        >         table.exec.resource.default-parallelism config option or I set
        >         the default local parallelism to something higher than 1. I
        >         would end up with less records in the end, and each time I ran
        >         the output record count would come out differently.
        >
        >         I managed to distill an example, as pasted below (with attribute
        >         names changed to protect company proprietary info), that causes
        >         the issue. I feel like I managed to get it to happen with a LEFT
        >         JOIN rather than a FULL JOIN, but the distilled version wasn’t
        >         giving me wrong results with that. Maybe it has to do with
        >         joining to a table that was formed using a GROUP BY? Can
        >         somebody tell if I’m doing something that is known not to work,
        >         or if I have run across a bug?
        >
        >         Regards,
        >
        >         Dylan Forciea
        >
        >         objectJob{
        >
        >         defmain(args: Array[String]): Unit= {
        >
        >         StreamExecutionEnvironment.setDefaultLocalParallelism(1)
        >
        >         valsettings=
        >         EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        >
        >         valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
        >
        >         valstreamTableEnv= StreamTableEnvironment.create(streamEnv,
        >         settings)
        >
        >         valconfiguration= streamTableEnv.getConfig().getConfiguration()
        >
        >            
        >         configuration.setInteger("table.exec.resource.default-parallelism",
        >         16)
        >
        >              streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table1 (
        >
        >                  id1 STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = 'table1’,
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table2 (
        >
        >                  attr STRING,
        >
        >                  id2 STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = 'table2',
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table3 (
        >
        >                  attr STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr_mapped STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = ‘table3',
        >
        >                  'username' = ‘username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql("""
        >
        >                CREATE TABLE sink (
        >
        >                  id STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr STRING,
        >
        >                  attr_mapped STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…,
        >
        >                  'table-name' = 'sink',
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >         valview=
        >
        >                streamTableEnv.sqlQuery("""
        >
        >                SELECT
        >
        >                  COALESCE(t1.id1, t2.id2) AS id,
        >
        >                  COALESCE(t2.attr, t1.attr) AS attr,
        >
        >                  COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
        >
        >                FROM table1 t1
        >
        >                FULL JOIN (
        >
        >                  SELECT
        >
        >                    id2,
        >
        >                    FIRST_VALUE(attr) AS attr
        >
        >                  FROM table2
        >
        >                  GROUP BY id2
        >
        >                ) t2
        >
        >                 ON (t1.id1 = t2.id2)
        >
        >                LEFT JOIN table3 t3
        >
        >                  ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
        >
        >              streamTableEnv.createTemporaryView("view", view)
        >
        >         valstatementSet= streamTableEnv.createStatementSet()
        >
        >              statementSet.addInsertSql("""
        >
        >                INSERT INTO sink SELECT * FROM view
        >
        >              """)
        >
        >              statementSet.execute().await()
        >
        >            }
        >
        >         }
        >



Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Jark Wu-3
HI Dylan,

I think this has the same reason as https://issues.apache.org/jira/browse/FLINK-20374
The root cause is that changelogs are shuffled by `attr` at second join, 
and thus records with the same `id` will be shuffled to different join tasks (also different sink tasks). 
So the data arrived at sinks are not ordered on the sink primary key.

We may need something like primary key ordering mechanism in the whole planner to fix this.

Best,
Jark

On Thu, 15 Apr 2021 at 01:33, Dylan Forciea <[hidden email]> wrote:
On a side note - I changed to use the batch mode per your suggestion Timo, and my job ran much faster and with deterministic counts with parallelism turned on. So I'll probably utilize that for now. However, it would still be nice to dig down into why streaming isn't working in case I need that in the future.

Dylan

On 4/14/21, 10:27 AM, "Dylan Forciea" <[hidden email]> wrote:

    Timo,

    Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it)

    Dylan

    == Abstract Syntax Tree ==
    LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
    +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, $1)])
       +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
          :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS NOT NULL($3), $3, $1)])
          :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
          :     :- LogicalTableScan(table=[[default_catalog, default_database, table1]])
          :     +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
          :        +- LogicalProject(id2=[$1], attr=[$0])
          :           +- LogicalTableScan(table=[[default_catalog, default_database, table2]])
          +- LogicalTableScan(table=[[default_catalog, default_database, table3]])

    == Optimized Logical Plan ==
    Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped], changelogMode=[NONE])
    +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
       +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
          :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
          :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
          :     +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
          :        :- Exchange(distribution=[hash[id1]], changelogMode=[I])
          :        :  +- TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr], changelogMode=[I])
          :        +- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA])
          :           +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr], changelogMode=[I,UB,UA])
          :              +- Exchange(distribution=[hash[id2]], changelogMode=[I])
          :                 +- TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2], changelogMode=[I])
          +- Exchange(distribution=[hash[attr]], changelogMode=[I])
             +- TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped], changelogMode=[I])

    == Physical Execution Plan ==
    Stage 1 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr])

    Stage 3 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2])

        Stage 5 : Attr
                content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr])
                ship_strategy : HASH

                Stage 7 : Attr
                        content : Join(joinType=[FullOuterJoin], where=[(id1 = id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                        ship_strategy : HASH

                        Stage 8 : Attr
                                content : Calc(select=[id1, attr, id2, attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
                                ship_strategy : FORWARD

    Stage 10 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped])

        Stage 12 : Attr
                content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                ship_strategy : HASH

                Stage 13 : Attr
                        content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS attr_mapped])
                        ship_strategy : FORWARD

                        Stage 14 : Data Sink
                                content : Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
                                ship_strategy : FORWARD

    On 4/14/21, 10:08 AM, "Timo Walther" <[hidden email]> wrote:

        Can you share the resulting plan with us? Ideally with the ChangelogMode
        detail enabled as well.

        statementSet.explain(...)

        Maybe this could help.

        Regards,
        Timo



        On 14.04.21 16:47, Dylan Forciea wrote:
        > Piotrek,
        >
        > I am looking at the count of records present in the sink table in
        > Postgres after the entire job completes, not the number of
        > inserts/retracts. I can see as the job runs that records are added and
        > removed from the “sink” table. With parallelism set to 1, it always
        > comes out to the same number (which is consistent with the number of ids
        > in the source tables “table1” and “table2”), at about 491k records in
        > table “sink” when the job is complete. With the parallelism set to 16,
        > the “sink” table will have somewhere around 360k records +/- 20k when
        > the job is complete. I truncate the “sink” table before I run the job,
        > and this is a test environment where the source databases are static.
        >
        > I removed my line for setting to Batch mode per Timo’s suggestion, and
        > am still running with MAX which should have deterministic output.
        >
        > Dylan
        >
        > *From: *Piotr Nowojski <[hidden email]>
        > *Date: *Wednesday, April 14, 2021 at 9:38 AM
        > *To: *Dylan Forciea <[hidden email]>
        > *Cc: *"[hidden email]" <[hidden email]>
        > *Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1
        >
        > Hi Dylan,
        >
        > But if you are running your query in Streaming mode, aren't you counting
        > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN,
        > when the first record comes in it will be immediately emitted with NULLs
        > (not matched, as the other table is empty). Later if a matching record
        > is received from the second table, the previous result will be retracted
        > and the new one, updated, will be re-emitted. Maybe this is what you are
        > observing in the varying output?
        >
        > Maybe you could try to analyse how the results differ between different
        > runs?
        >
        > Best,
        >
        > Piotrek
        >
        > śr., 14 kwi 2021 o 16:22 Dylan Forciea <[hidden email]
        > <mailto:[hidden email]>> napisał(a):
        >
        >     I replaced the FIRST_VALUE with MAX to ensure that the results
        >     should be identical even in their content, and my problem still
        >     remains – I end up with a nondeterministic count of records being
        >     emitted into the sink when the parallelism is over 1, and that count
        >     is about 20-25% short (and not consistent) of what comes out
        >     consistently when parallelism is set to 1.
        >
        >     Dylan
        >
        >     *From: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
        >     *Date: *Wednesday, April 14, 2021 at 9:08 AM
        >     *To: *Piotr Nowojski <[hidden email]
        >     <mailto:[hidden email]>>
        >     *Cc: *"[hidden email] <mailto:[hidden email]>"
        >     <[hidden email] <mailto:[hidden email]>>
        >     *Subject: *Re: Nondeterministic results with SQL job when
        >     parallelism is > 1
        >
        >     Pitorek,
        >
        >     I was actually originally using a group function that WAS
        >     deterministic (but was a custom UDF I made), but chose something
        >     here built in. By non-deterministic, I mean that the number of
        >     records coming out is not consistent. Since the FIRST_VALUE here is
        >     on an attribute that is not part of the key, that shouldn’t affect
        >     the number of records coming out I wouldn’t think.
        >
        >     Dylan
        >
        >     *From: *Piotr Nowojski <[hidden email]
        >     <mailto:[hidden email]>>
        >     *Date: *Wednesday, April 14, 2021 at 9:06 AM
        >     *To: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
        >     *Cc: *"[hidden email] <mailto:[hidden email]>"
        >     <[hidden email] <mailto:[hidden email]>>
        >     *Subject: *Re: Nondeterministic results with SQL job when
        >     parallelism is > 1
        >
        >     Hi,
        >
        >     Yes, it looks like your query is non deterministic because of
        >     `FIRST_VALUE` used inside `GROUP BY`. If you have many different
        >     parallel sources, each time you run your query your first value
        >     might be different. If that's the case, you could try to confirm it
        >     with even smaller query:
        >
        >             SELECT
        >                id2,
        >                FIRST_VALUE(attr) AS attr
        >              FROM table2
        >              GROUP BY id2
        >
        >     Best,
        >
        >     Piotrek
        >
        >     śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]
        >     <mailto:[hidden email]>> napisał(a):
        >
        >         I am running Flink 1.12.2, and I was trying to up the
        >         parallelism of my Flink SQL job to see what happened. However,
        >         once I did that, my results became nondeterministic. This
        >         happens whether I set the
        >         table.exec.resource.default-parallelism config option or I set
        >         the default local parallelism to something higher than 1. I
        >         would end up with less records in the end, and each time I ran
        >         the output record count would come out differently.
        >
        >         I managed to distill an example, as pasted below (with attribute
        >         names changed to protect company proprietary info), that causes
        >         the issue. I feel like I managed to get it to happen with a LEFT
        >         JOIN rather than a FULL JOIN, but the distilled version wasn’t
        >         giving me wrong results with that. Maybe it has to do with
        >         joining to a table that was formed using a GROUP BY? Can
        >         somebody tell if I’m doing something that is known not to work,
        >         or if I have run across a bug?
        >
        >         Regards,
        >
        >         Dylan Forciea
        >
        >         objectJob{
        >
        >         defmain(args: Array[String]): Unit= {
        >
        >         StreamExecutionEnvironment.setDefaultLocalParallelism(1)
        >
        >         valsettings=
        >         EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        >
        >         valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
        >
        >         valstreamTableEnv= StreamTableEnvironment.create(streamEnv,
        >         settings)
        >
        >         valconfiguration= streamTableEnv.getConfig().getConfiguration()
        >
        >             
        >         configuration.setInteger("table.exec.resource.default-parallelism",
        >         16)
        >
        >              streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table1 (
        >
        >                  id1 STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = 'table1’,
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table2 (
        >
        >                  attr STRING,
        >
        >                  id2 STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = 'table2',
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table3 (
        >
        >                  attr STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr_mapped STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = ‘table3',
        >
        >                  'username' = ‘username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql("""
        >
        >                CREATE TABLE sink (
        >
        >                  id STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr STRING,
        >
        >                  attr_mapped STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…,
        >
        >                  'table-name' = 'sink',
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >         valview=
        >
        >                streamTableEnv.sqlQuery("""
        >
        >                SELECT
        >
        >                  COALESCE(t1.id1, t2.id2) AS id,
        >
        >                  COALESCE(t2.attr, t1.attr) AS attr,
        >
        >                  COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
        >
        >                FROM table1 t1
        >
        >                FULL JOIN (
        >
        >                  SELECT
        >
        >                    id2,
        >
        >                    FIRST_VALUE(attr) AS attr
        >
        >                  FROM table2
        >
        >                  GROUP BY id2
        >
        >                ) t2
        >
        >                 ON (t1.id1 = t2.id2)
        >
        >                LEFT JOIN table3 t3
        >
        >                  ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
        >
        >              streamTableEnv.createTemporaryView("view", view)
        >
        >         valstatementSet= streamTableEnv.createStatementSet()
        >
        >              statementSet.addInsertSql("""
        >
        >                INSERT INTO sink SELECT * FROM view
        >
        >              """)
        >
        >              statementSet.execute().await()
        >
        >            }
        >
        >         }
        >



Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Dylan Forciea

Jark,

 

Thanks for the heads up! I didn’t see this behavior when running in batch mode with parallelism turned on. Is it safe to do this kind of join in batch mode right now, or am I just getting lucky?

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Friday, April 16, 2021 at 5:10 AM
To: Dylan Forciea <[hidden email]>
Cc: Timo Walther <[hidden email]>, Piotr Nowojski <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

HI Dylan,

 

I think this has the same reason as https://issues.apache.org/jira/browse/FLINK-20374

The root cause is that changelogs are shuffled by `attr` at second join, 

and thus records with the same `id` will be shuffled to different join tasks (also different sink tasks). 

So the data arrived at sinks are not ordered on the sink primary key.

 

We may need something like primary key ordering mechanism in the whole planner to fix this.

 

Best,

Jark

 

On Thu, 15 Apr 2021 at 01:33, Dylan Forciea <[hidden email]> wrote:

On a side note - I changed to use the batch mode per your suggestion Timo, and my job ran much faster and with deterministic counts with parallelism turned on. So I'll probably utilize that for now. However, it would still be nice to dig down into why streaming isn't working in case I need that in the future.

Dylan

On 4/14/21, 10:27 AM, "Dylan Forciea" <[hidden email]> wrote:

    Timo,

    Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it)

    Dylan

    == Abstract Syntax Tree ==
    LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
    +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, $1)])
       +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
          :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS NOT NULL($3), $3, $1)])
          :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
          :     :- LogicalTableScan(table=[[default_catalog, default_database, table1]])
          :     +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
          :        +- LogicalProject(id2=[$1], attr=[$0])
          :           +- LogicalTableScan(table=[[default_catalog, default_database, table2]])
          +- LogicalTableScan(table=[[default_catalog, default_database, table3]])

    == Optimized Logical Plan ==
    Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped], changelogMode=[NONE])
    +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
       +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
          :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
          :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
          :     +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
          :        :- Exchange(distribution=[hash[id1]], changelogMode=[I])
          :        :  +- TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr], changelogMode=[I])
          :        +- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA])
          :           +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr], changelogMode=[I,UB,UA])
          :              +- Exchange(distribution=[hash[id2]], changelogMode=[I])
          :                 +- TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2], changelogMode=[I])
          +- Exchange(distribution=[hash[attr]], changelogMode=[I])
             +- TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped], changelogMode=[I])

    == Physical Execution Plan ==
    Stage 1 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr])

    Stage 3 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2])

        Stage 5 : Attr
                content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr])
                ship_strategy : HASH

                Stage 7 : Attr
                        content : Join(joinType=[FullOuterJoin], where=[(id1 = id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                        ship_strategy : HASH

                        Stage 8 : Attr
                                content : Calc(select=[id1, attr, id2, attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
                                ship_strategy : FORWARD

    Stage 10 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped])

        Stage 12 : Attr
                content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                ship_strategy : HASH

                Stage 13 : Attr
                        content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS attr_mapped])
                        ship_strategy : FORWARD

                        Stage 14 : Data Sink
                                content : Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
                                ship_strategy : FORWARD

    On 4/14/21, 10:08 AM, "Timo Walther" <[hidden email]> wrote:

        Can you share the resulting plan with us? Ideally with the ChangelogMode
        detail enabled as well.

        statementSet.explain(...)

        Maybe this could help.

        Regards,
        Timo



        On 14.04.21 16:47, Dylan Forciea wrote:
        > Piotrek,
        >
        > I am looking at the count of records present in the sink table in
        > Postgres after the entire job completes, not the number of
        > inserts/retracts. I can see as the job runs that records are added and
        > removed from the “sink” table. With parallelism set to 1, it always
        > comes out to the same number (which is consistent with the number of ids
        > in the source tables “table1” and “table2”), at about 491k records in
        > table “sink” when the job is complete. With the parallelism set to 16,
        > the “sink” table will have somewhere around 360k records +/- 20k when
        > the job is complete. I truncate the “sink” table before I run the job,
        > and this is a test environment where the source databases are static.
        >
        > I removed my line for setting to Batch mode per Timo’s suggestion, and
        > am still running with MAX which should have deterministic output.
        >
        > Dylan
        >
        > *From: *Piotr Nowojski <[hidden email]>
        > *Date: *Wednesday, April 14, 2021 at 9:38 AM
        > *To: *Dylan Forciea <[hidden email]>
        > *Cc: *"[hidden email]" <[hidden email]>
        > *Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1
        >
        > Hi Dylan,
        >
        > But if you are running your query in Streaming mode, aren't you counting
        > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN,
        > when the first record comes in it will be immediately emitted with NULLs
        > (not matched, as the other table is empty). Later if a matching record
        > is received from the second table, the previous result will be retracted
        > and the new one, updated, will be re-emitted. Maybe this is what you are
        > observing in the varying output?
        >
        > Maybe you could try to analyse how the results differ between different
        > runs?
        >
        > Best,
        >
        > Piotrek
        >
        > śr., 14 kwi 2021 o 16:22 Dylan Forciea <[hidden email]
        > <mailto:[hidden email]>> napisał(a):
        >
        >     I replaced the FIRST_VALUE with MAX to ensure that the results
        >     should be identical even in their content, and my problem still
        >     remains – I end up with a nondeterministic count of records being
        >     emitted into the sink when the parallelism is over 1, and that count
        >     is about 20-25% short (and not consistent) of what comes out
        >     consistently when parallelism is set to 1.
        >
        >     Dylan
        >
        >     *From: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
        >     *Date: *Wednesday, April 14, 2021 at 9:08 AM
        >     *To: *Piotr Nowojski <[hidden email]
        >     <mailto:[hidden email]>>
        >     *Cc: *"[hidden email] <mailto:[hidden email]>"
        >     <[hidden email] <mailto:[hidden email]>>
        >     *Subject: *Re: Nondeterministic results with SQL job when
        >     parallelism is > 1
        >
        >     Pitorek,
        >
        >     I was actually originally using a group function that WAS
        >     deterministic (but was a custom UDF I made), but chose something
        >     here built in. By non-deterministic, I mean that the number of
        >     records coming out is not consistent. Since the FIRST_VALUE here is
        >     on an attribute that is not part of the key, that shouldn’t affect
        >     the number of records coming out I wouldn’t think.
        >
        >     Dylan
        >
        >     *From: *Piotr Nowojski <[hidden email]
        >     <mailto:[hidden email]>>
        >     *Date: *Wednesday, April 14, 2021 at 9:06 AM
        >     *To: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
        >     *Cc: *"[hidden email] <mailto:[hidden email]>"
        >     <[hidden email] <mailto:[hidden email]>>
        >     *Subject: *Re: Nondeterministic results with SQL job when
        >     parallelism is > 1
        >
        >     Hi,
        >
        >     Yes, it looks like your query is non deterministic because of
        >     `FIRST_VALUE` used inside `GROUP BY`. If you have many different
        >     parallel sources, each time you run your query your first value
        >     might be different. If that's the case, you could try to confirm it
        >     with even smaller query:
        >
        >             SELECT
        >                id2,
        >                FIRST_VALUE(attr) AS attr
        >              FROM table2
        >              GROUP BY id2
        >
        >     Best,
        >
        >     Piotrek
        >
        >     śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]
        >     <mailto:[hidden email]>> napisał(a):
        >
        >         I am running Flink 1.12.2, and I was trying to up the
        >         parallelism of my Flink SQL job to see what happened. However,
        >         once I did that, my results became nondeterministic. This
        >         happens whether I set the
        >         table.exec.resource.default-parallelism config option or I set
        >         the default local parallelism to something higher than 1. I
        >         would end up with less records in the end, and each time I ran
        >         the output record count would come out differently.
        >
        >         I managed to distill an example, as pasted below (with attribute
        >         names changed to protect company proprietary info), that causes
        >         the issue. I feel like I managed to get it to happen with a LEFT
        >         JOIN rather than a FULL JOIN, but the distilled version wasn’t
        >         giving me wrong results with that. Maybe it has to do with
        >         joining to a table that was formed using a GROUP BY? Can
        >         somebody tell if I’m doing something that is known not to work,
        >         or if I have run across a bug?
        >
        >         Regards,
        >
        >         Dylan Forciea
        >
        >         objectJob{
        >
        >         defmain(args: Array[String]): Unit= {
        >
        >         StreamExecutionEnvironment.setDefaultLocalParallelism(1)
        >
        >         valsettings=
        >         EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        >
        >         valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
        >
        >         valstreamTableEnv= StreamTableEnvironment.create(streamEnv,
        >         settings)
        >
        >         valconfiguration= streamTableEnv.getConfig().getConfiguration()
        >
        >             
        >         configuration.setInteger("table.exec.resource.default-parallelism",
        >         16)
        >
        >              streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table1 (
        >
        >                  id1 STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = 'table1’,
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table2 (
        >
        >                  attr STRING,
        >
        >                  id2 STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = 'table2',
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table3 (
        >
        >                  attr STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr_mapped STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = ‘table3',
        >
        >                  'username' = ‘username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql("""
        >
        >                CREATE TABLE sink (
        >
        >                  id STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr STRING,
        >
        >                  attr_mapped STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…,
        >
        >                  'table-name' = 'sink',
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >         valview=
        >
        >                streamTableEnv.sqlQuery("""
        >
        >                SELECT
        >
        >                  COALESCE(t1.id1, t2.id2) AS id,
        >
        >                  COALESCE(t2.attr, t1.attr) AS attr,
        >
        >                  COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
        >
        >                FROM table1 t1
        >
        >                FULL JOIN (
        >
        >                  SELECT
        >
        >                    id2,
        >
        >                    FIRST_VALUE(attr) AS attr
        >
        >                  FROM table2
        >
        >                  GROUP BY id2
        >
        >                ) t2
        >
        >                 ON (t1.id1 = t2.id2)
        >
        >                LEFT JOIN table3 t3
        >
        >                  ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
        >
        >              streamTableEnv.createTemporaryView("view", view)
        >
        >         valstatementSet= streamTableEnv.createStatementSet()
        >
        >              statementSet.addInsertSql("""
        >
        >                INSERT INTO sink SELECT * FROM view
        >
        >              """)
        >
        >              statementSet.execute().await()
        >
        >            }
        >
        >         }
        >


Reply | Threaded
Open this post in threaded view
|

Re: Nondeterministic results with SQL job when parallelism is > 1

Jark Wu-3
Hi Dylan,

The primary key ordering problem I mean above is about changelog. Batch queries only emit a final result, and thus don't have changelog, so it's safe to use batch mode. 

The problem only exists in streaming mode with more than 1 parallelism.

Best,
Jark

On Fri, 16 Apr 2021 at 21:40, Dylan Forciea <[hidden email]> wrote:

Jark,

 

Thanks for the heads up! I didn’t see this behavior when running in batch mode with parallelism turned on. Is it safe to do this kind of join in batch mode right now, or am I just getting lucky?

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Friday, April 16, 2021 at 5:10 AM
To: Dylan Forciea <[hidden email]>
Cc: Timo Walther <[hidden email]>, Piotr Nowojski <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

 

HI Dylan,

 

I think this has the same reason as https://issues.apache.org/jira/browse/FLINK-20374

The root cause is that changelogs are shuffled by `attr` at second join, 

and thus records with the same `id` will be shuffled to different join tasks (also different sink tasks). 

So the data arrived at sinks are not ordered on the sink primary key.

 

We may need something like primary key ordering mechanism in the whole planner to fix this.

 

Best,

Jark

 

On Thu, 15 Apr 2021 at 01:33, Dylan Forciea <[hidden email]> wrote:

On a side note - I changed to use the batch mode per your suggestion Timo, and my job ran much faster and with deterministic counts with parallelism turned on. So I'll probably utilize that for now. However, it would still be nice to dig down into why streaming isn't working in case I need that in the future.

Dylan

On 4/14/21, 10:27 AM, "Dylan Forciea" <[hidden email]> wrote:

    Timo,

    Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it)

    Dylan

    == Abstract Syntax Tree ==
    LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
    +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, $1)])
       +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
          :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS NOT NULL($3), $3, $1)])
          :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
          :     :- LogicalTableScan(table=[[default_catalog, default_database, table1]])
          :     +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
          :        +- LogicalProject(id2=[$1], attr=[$0])
          :           +- LogicalTableScan(table=[[default_catalog, default_database, table2]])
          +- LogicalTableScan(table=[[default_catalog, default_database, table3]])

    == Optimized Logical Plan ==
    Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped], changelogMode=[NONE])
    +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
       +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
          :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
          :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
          :     +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
          :        :- Exchange(distribution=[hash[id1]], changelogMode=[I])
          :        :  +- TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr], changelogMode=[I])
          :        +- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA])
          :           +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr], changelogMode=[I,UB,UA])
          :              +- Exchange(distribution=[hash[id2]], changelogMode=[I])
          :                 +- TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2], changelogMode=[I])
          +- Exchange(distribution=[hash[attr]], changelogMode=[I])
             +- TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped], changelogMode=[I])

    == Physical Execution Plan ==
    Stage 1 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr])

    Stage 3 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2])

        Stage 5 : Attr
                content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr])
                ship_strategy : HASH

                Stage 7 : Attr
                        content : Join(joinType=[FullOuterJoin], where=[(id1 = id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                        ship_strategy : HASH

                        Stage 8 : Attr
                                content : Calc(select=[id1, attr, id2, attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
                                ship_strategy : FORWARD

    Stage 10 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped])

        Stage 12 : Attr
                content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                ship_strategy : HASH

                Stage 13 : Attr
                        content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS attr_mapped])
                        ship_strategy : FORWARD

                        Stage 14 : Data Sink
                                content : Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped])
                                ship_strategy : FORWARD

    On 4/14/21, 10:08 AM, "Timo Walther" <[hidden email]> wrote:

        Can you share the resulting plan with us? Ideally with the ChangelogMode
        detail enabled as well.

        statementSet.explain(...)

        Maybe this could help.

        Regards,
        Timo



        On 14.04.21 16:47, Dylan Forciea wrote:
        > Piotrek,
        >
        > I am looking at the count of records present in the sink table in
        > Postgres after the entire job completes, not the number of
        > inserts/retracts. I can see as the job runs that records are added and
        > removed from the “sink” table. With parallelism set to 1, it always
        > comes out to the same number (which is consistent with the number of ids
        > in the source tables “table1” and “table2”), at about 491k records in
        > table “sink” when the job is complete. With the parallelism set to 16,
        > the “sink” table will have somewhere around 360k records +/- 20k when
        > the job is complete. I truncate the “sink” table before I run the job,
        > and this is a test environment where the source databases are static.
        >
        > I removed my line for setting to Batch mode per Timo’s suggestion, and
        > am still running with MAX which should have deterministic output.
        >
        > Dylan
        >
        > *From: *Piotr Nowojski <[hidden email]>
        > *Date: *Wednesday, April 14, 2021 at 9:38 AM
        > *To: *Dylan Forciea <[hidden email]>
        > *Cc: *"[hidden email]" <[hidden email]>
        > *Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1
        >
        > Hi Dylan,
        >
        > But if you are running your query in Streaming mode, aren't you counting
        > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN,
        > when the first record comes in it will be immediately emitted with NULLs
        > (not matched, as the other table is empty). Later if a matching record
        > is received from the second table, the previous result will be retracted
        > and the new one, updated, will be re-emitted. Maybe this is what you are
        > observing in the varying output?
        >
        > Maybe you could try to analyse how the results differ between different
        > runs?
        >
        > Best,
        >
        > Piotrek
        >
        > śr., 14 kwi 2021 o 16:22 Dylan Forciea <[hidden email]
        > <mailto:[hidden email]>> napisał(a):
        >
        >     I replaced the FIRST_VALUE with MAX to ensure that the results
        >     should be identical even in their content, and my problem still
        >     remains – I end up with a nondeterministic count of records being
        >     emitted into the sink when the parallelism is over 1, and that count
        >     is about 20-25% short (and not consistent) of what comes out
        >     consistently when parallelism is set to 1.
        >
        >     Dylan
        >
        >     *From: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
        >     *Date: *Wednesday, April 14, 2021 at 9:08 AM
        >     *To: *Piotr Nowojski <[hidden email]
        >     <mailto:[hidden email]>>
        >     *Cc: *"[hidden email] <mailto:[hidden email]>"
        >     <[hidden email] <mailto:[hidden email]>>
        >     *Subject: *Re: Nondeterministic results with SQL job when
        >     parallelism is > 1
        >
        >     Pitorek,
        >
        >     I was actually originally using a group function that WAS
        >     deterministic (but was a custom UDF I made), but chose something
        >     here built in. By non-deterministic, I mean that the number of
        >     records coming out is not consistent. Since the FIRST_VALUE here is
        >     on an attribute that is not part of the key, that shouldn’t affect
        >     the number of records coming out I wouldn’t think.
        >
        >     Dylan
        >
        >     *From: *Piotr Nowojski <[hidden email]
        >     <mailto:[hidden email]>>
        >     *Date: *Wednesday, April 14, 2021 at 9:06 AM
        >     *To: *Dylan Forciea <[hidden email] <mailto:[hidden email]>>
        >     *Cc: *"[hidden email] <mailto:[hidden email]>"
        >     <[hidden email] <mailto:[hidden email]>>
        >     *Subject: *Re: Nondeterministic results with SQL job when
        >     parallelism is > 1
        >
        >     Hi,
        >
        >     Yes, it looks like your query is non deterministic because of
        >     `FIRST_VALUE` used inside `GROUP BY`. If you have many different
        >     parallel sources, each time you run your query your first value
        >     might be different. If that's the case, you could try to confirm it
        >     with even smaller query:
        >
        >             SELECT
        >                id2,
        >                FIRST_VALUE(attr) AS attr
        >              FROM table2
        >              GROUP BY id2
        >
        >     Best,
        >
        >     Piotrek
        >
        >     śr., 14 kwi 2021 o 14:45 Dylan Forciea <[hidden email]
        >     <mailto:[hidden email]>> napisał(a):
        >
        >         I am running Flink 1.12.2, and I was trying to up the
        >         parallelism of my Flink SQL job to see what happened. However,
        >         once I did that, my results became nondeterministic. This
        >         happens whether I set the
        >         table.exec.resource.default-parallelism config option or I set
        >         the default local parallelism to something higher than 1. I
        >         would end up with less records in the end, and each time I ran
        >         the output record count would come out differently.
        >
        >         I managed to distill an example, as pasted below (with attribute
        >         names changed to protect company proprietary info), that causes
        >         the issue. I feel like I managed to get it to happen with a LEFT
        >         JOIN rather than a FULL JOIN, but the distilled version wasn’t
        >         giving me wrong results with that. Maybe it has to do with
        >         joining to a table that was formed using a GROUP BY? Can
        >         somebody tell if I’m doing something that is known not to work,
        >         or if I have run across a bug?
        >
        >         Regards,
        >
        >         Dylan Forciea
        >
        >         objectJob{
        >
        >         defmain(args: Array[String]): Unit= {
        >
        >         StreamExecutionEnvironment.setDefaultLocalParallelism(1)
        >
        >         valsettings=
        >         EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        >
        >         valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
        >
        >         valstreamTableEnv= StreamTableEnvironment.create(streamEnv,
        >         settings)
        >
        >         valconfiguration= streamTableEnv.getConfig().getConfiguration()
        >
        >             
        >         configuration.setInteger("table.exec.resource.default-parallelism",
        >         16)
        >
        >              streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table1 (
        >
        >                  id1 STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = 'table1’,
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table2 (
        >
        >                  attr STRING,
        >
        >                  id2 STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = 'table2',
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql(
        >
        >         """
        >
        >                CREATE TABLE table3 (
        >
        >                  attr STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr_mapped STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…',
        >
        >                  'table-name' = ‘table3',
        >
        >                  'username' = ‘username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >              streamTableEnv.executeSql("""
        >
        >                CREATE TABLE sink (
        >
        >                  id STRING PRIMARY KEY NOT ENFORCED,
        >
        >                  attr STRING,
        >
        >                  attr_mapped STRING
        >
        >                ) WITH (
        >
        >                  'connector' = 'jdbc',
        >
        >                  'url' = 'jdbc:postgresql://…,
        >
        >                  'table-name' = 'sink',
        >
        >                  'username' = 'username',
        >
        >                  'password' = 'password',
        >
        >                  'scan.fetch-size' = '500',
        >
        >                  'scan.auto-commit' = 'false'
        >
        >                )""")
        >
        >         valview=
        >
        >                streamTableEnv.sqlQuery("""
        >
        >                SELECT
        >
        >                  COALESCE(t1.id1, t2.id2) AS id,
        >
        >                  COALESCE(t2.attr, t1.attr) AS attr,
        >
        >                  COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
        >
        >                FROM table1 t1
        >
        >                FULL JOIN (
        >
        >                  SELECT
        >
        >                    id2,
        >
        >                    FIRST_VALUE(attr) AS attr
        >
        >                  FROM table2
        >
        >                  GROUP BY id2
        >
        >                ) t2
        >
        >                 ON (t1.id1 = t2.id2)
        >
        >                LEFT JOIN table3 t3
        >
        >                  ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
        >
        >              streamTableEnv.createTemporaryView("view", view)
        >
        >         valstatementSet= streamTableEnv.createStatementSet()
        >
        >              statementSet.addInsertSql("""
        >
        >                INSERT INTO sink SELECT * FROM view
        >
        >              """)
        >
        >              statementSet.execute().await()
        >
        >            }
        >
        >         }
        >