Duplicate operators generated by plan

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Duplicate operators generated by plan

Rex Fenley
Hello,

I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Duplicate operators generated by plan

Yun Gao
Hi Rex,

    Could  you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ?

Best,
Yun
------------------Original Mail ------------------
Sender:Rex Fenley <[hidden email]>
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user <[hidden email]>
Subject:Duplicate operators generated by plan
Hello,

I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Duplicate operators generated by plan

Rex Fenley
Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
OrgUsersRoleSplatPrefix,
this.tableEnv
)

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
.as(
"id_splatted",
s"${columnPrefix}_is_admin",
s"${columnPrefix}_is_teacher",
s"${columnPrefix}_is_student",
s"${columnPrefix}_is_parent"
)
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
SELECT
id_splatted,
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
""")
return table
.join(grouped, $"id" === $"id_splatted")
.dropColumns($"id_splatted")
.renameColumns($"roles".as(s"${columnPrefix}_roles"))
}

@FunctionHint(
output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
)
)
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(
Types.LONG,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN
)
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <[hidden email]> wrote:
Hi Rex,

    Could  you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ?

Best,
Yun
------------------Original Mail ------------------
Sender:Rex Fenley <[hidden email]>
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user <[hidden email]>
Subject:Duplicate operators generated by plan
Hello,

I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Duplicate operators generated by plan

Rex Fenley
cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <[hidden email]> wrote:
Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
OrgUsersRoleSplatPrefix,
this.tableEnv
)

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
.as(
"id_splatted",
s"${columnPrefix}_is_admin",
s"${columnPrefix}_is_teacher",
s"${columnPrefix}_is_student",
s"${columnPrefix}_is_parent"
)
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
SELECT
id_splatted,
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
""")
return table
.join(grouped, $"id" === $"id_splatted")
.dropColumns($"id_splatted")
.renameColumns($"roles".as(s"${columnPrefix}_roles"))
}

@FunctionHint(
output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
)
)
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(
Types.LONG,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN
)
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <[hidden email]> wrote:
Hi Rex,

    Could  you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ?

Best,
Yun
------------------Original Mail ------------------
Sender:Rex Fenley <[hidden email]>
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user <[hidden email]>
Subject:Duplicate operators generated by plan
Hello,

I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Re: Duplicate operators generated by plan

Yun Gao
Hi Rex,

   I tried a similar example[1] but did not reproduce the issue, which version of Flink you are using now ?

Best,
 Yun


 
[1] The example code:
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setRestartStrategy(RestartStrategies.noRestart());
bsEnv.setParallelism(1);

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

DataStream<Tuple2<Integer, String>> source = bsEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, String>>() {

@Override
public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws Exception {
sourceContext.collect(new Tuple2<>(0, "test"));
}

@Override
public void cancel() {

}
});

Table table = bsTableEnv.fromDataStream(
source, $("id"), $("name"));
Table table2 = table.select(call("abs", $("id")), $("name"))
.as("new_id", "new_name");

bsTableEnv.createTemporaryView("view", table2);
Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as new_name from view group by new_id");

Table ret = table.join(handled)
.where($("id").isEqual($("new_id")))
.select($("id"), $("name"), $("new_name"));
System.out.println(ret.explain());

DataStream<Tuple2<Boolean, Row>> row = bsTableEnv.toRetractStream(ret, Row.class);
row.addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {

}
});

System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON());
------------------------------------------------------------------
Sender:Rex Fenley<[hidden email]>
Date:2020/12/04 14:18:21
Recipient:Yun Gao<[hidden email]>
Cc:user<[hidden email]>; Brad Davis<[hidden email]>
Theme:Re: Duplicate operators generated by plan

cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <[hidden email]> wrote:
Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
OrgUsersRoleSplatPrefix,
this.tableEnv
)

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
.as(
"id_splatted",
s"${columnPrefix}_is_admin",
s"${columnPrefix}_is_teacher",
s"${columnPrefix}_is_student",
s"${columnPrefix}_is_parent"
)
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
SELECT
id_splatted,
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
""")
return table
.join(grouped, $"id" === $"id_splatted")
.dropColumns($"id_splatted")
.renameColumns($"roles".as(s"${columnPrefix}_roles"))
}

@FunctionHint(
output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
)
)
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(
Types.LONG,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN
)
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <[hidden email]> wrote:
Hi Rex,

    Could  you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ?

Best,
Yun
------------------Original Mail ------------------
Sender:Rex Fenley <[hidden email]>
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user <[hidden email]>
Subject:Duplicate operators generated by plan
Hello,

I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Reply | Threaded
Open this post in threaded view
|

Re: Re: Duplicate operators generated by plan

Rex Fenley
Version 1.11.2

On Sun, Dec 6, 2020 at 10:20 PM Yun Gao <[hidden email]> wrote:
Hi Rex,

   I tried a similar example[1] but did not reproduce the issue, which version of Flink you are using now ?

Best,
 Yun


 
[1] The example code:
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setRestartStrategy(RestartStrategies.noRestart());
bsEnv.setParallelism(1);

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

DataStream<Tuple2<Integer, String>> source = bsEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, String>>() {

@Override
public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws Exception {
sourceContext.collect(new Tuple2<>(0, "test"));
}

@Override
public void cancel() {

}
});

Table table = bsTableEnv.fromDataStream(
source, $("id"), $("name"));
Table table2 = table.select(call("abs", $("id")), $("name"))
.as("new_id", "new_name");

bsTableEnv.createTemporaryView("view", table2);
Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as new_name from view group by new_id");

Table ret = table.join(handled)
.where($("id").isEqual($("new_id")))
.select($("id"), $("name"), $("new_name"));
System.out.println(ret.explain());

DataStream<Tuple2<Boolean, Row>> row = bsTableEnv.toRetractStream(ret, Row.class);
row.addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {

}
});

System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON());
------------------------------------------------------------------
Sender:Rex Fenley<[hidden email]>
Date:2020/12/04 14:18:21
Recipient:Yun Gao<[hidden email]>
Cc:user<[hidden email]>; Brad Davis<[hidden email]>
Theme:Re: Duplicate operators generated by plan

cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <[hidden email]> wrote:
Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
OrgUsersRoleSplatPrefix,
this.tableEnv
)

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
.as(
"id_splatted",
s"${columnPrefix}_is_admin",
s"${columnPrefix}_is_teacher",
s"${columnPrefix}_is_student",
s"${columnPrefix}_is_parent"
)
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
SELECT
id_splatted,
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
""")
return table
.join(grouped, $"id" === $"id_splatted")
.dropColumns($"id_splatted")
.renameColumns($"roles".as(s"${columnPrefix}_roles"))
}

@FunctionHint(
output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
)
)
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(
Types.LONG,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN
)
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <[hidden email]> wrote:
Hi Rex,

    Could  you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ?

Best,
Yun
------------------Original Mail ------------------
Sender:Rex Fenley <[hidden email]>
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user <[hidden email]>
Subject:Duplicate operators generated by plan
Hello,

I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US