Hello, I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too. eg. of two separate operators that are equal
Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
organization_id, user_id, roles, id_splatted, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent]
Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
organization_id, user_id, roles, id_splatted, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent]) Which are entirely the same datasets being processed. The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id,
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
TMP_0.f0 AS admin_organization_ids]) The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id,
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
TMP_0.f0 AS teacher_organization_ids]) And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled. Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to? Thanks! -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex, Could you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ? Best, Yun
|
Yes, the same exact input operators go into both joins. The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it. // in the main function val orgUsersTable = splatRoles( this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS), OrgUsersRoleSplatPrefix, this.tableEnv ) // helper function def splatRoles( table: Table, columnPrefix: String, tableEnv: TableEnvironment ): Table = { // Flink does not have a contains function so we have to splat out our role array's contents // and join it to the originating table. val func = new SplatRolesFunc() val splatted = table .map(func($"roles", $"id")) .as( "id_splatted", s"${columnPrefix}_is_admin", s"${columnPrefix}_is_teacher", s"${columnPrefix}_is_student", s"${columnPrefix}_is_parent" ) // FIRST_VALUE is only available in SQL - so this is SQL. // Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will // toss it out and all future joins will not have a unique key. tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted) val grouped = tableEnv.sqlQuery(s""" SELECT id_splatted, FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin, FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher, FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student, FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent FROM ${columnPrefix}_splatted GROUP BY id_splatted """) return table .join(grouped, $"id" === $"id_splatted") .dropColumns($"id_splatted") .renameColumns($"roles".as(s"${columnPrefix}_roles")) } @FunctionHint( output = new DataTypeHint( "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)" ) ) class SplatRolesFunc extends ScalarFunction { def eval(roles: Array[String], id: java.lang.Long): Row = { val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue) val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue) val isStudent: java.lang.Boolean = roles.contains(Student.rawValue) val isParent: java.lang.Boolean = roles.contains(Parent.rawValue) return Row.of(id, isAdmin, isTeacher, isStudent, isParent) } override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW( Types.LONG, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN ) } On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
cc Brad On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex, I tried a similar example[1] but did not reproduce the issue, which version of Flink you are using now ? Best, Yun [1] The example code: StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); ------------------------------------------------------------------ |
Version 1.11.2 On Sun, Dec 6, 2020 at 10:20 PM Yun Gao <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Free forum by Nabble | Edit this page |