How can I optimize joins or cache misses in SQL api?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How can I optimize joins or cache misses in SQL api?

Marco Villalobos-2
scenario:

kafka stream enriched with tableS in postgresql

Let's pretend that the postgres has an organizations, departments, and
persons table, and we want to join the full name of the kafka table
that has the person id.  I also want to determine if the person id is
missing.

This requires a left join.

SELECT o.id, d.id, p.fullname, k.ssn, SUM(k.amount)
FROM purchases k
JOIN organizations o ON o.code = k.organization
JOIN departmentS d ON d.code = k.department
LEFT JOIN persons FOR SYSTEM_TIME AS OF k.procTime AS p ON
p.department_id = d.id
WHERE p.ssn = k.ssn
GROUP BY
TUMBLE(s.procTime, INTERVAL '3' MINUTE), o.id, d.id, p.fullname, k.ssn

Let's say that the TTL for organizations and departments is 12 months,
but for persons
it is 1 month.

observations:

If six unique people enter the kafka topic, then that will issue six
separate queries to the database of the form:

SELECT id, ssn, fullname, dob FROM persons WHERE.deparment_id = $1 AND ssn = $2

However, since this is a tumbling, it would be more efficient to do
one query with six parameters in an IN clause.  Example:

SELECT id, ssn, fullname, dob FROM persons WHERE.(deparment_id, ssn)
IN (($1,$2), ($3,$4),($5,$6),($7,$8),($9,$10)($11,$12))

Q: Is there a way to control that? I don't want the N + 1 query problem.

Q: Are these queries performed asynchronously?  If there were 200000
unique persons, I would not want 20000 synchronous queries.

Q: Is there a way to preload persons table, since it changes only
about once every two weeks and then do a LEFT JOIN on it?

Let's say that the person does not exist. I am impressed that Flink
caches that a person does not exist.  However, I want to cache if a
person exists for a month, but if the person does not exist, I only
want to remember that for a day.

Q: Is there a way to control a shorter cache time for non-existent items?

I really like the expressiveness and succinctness of the SQL api in
Flink, however, I am worried that I need use the data-stream API in
order to control the scenarios above.

I appreciate any advice, thank you.
Reply | Threaded
Open this post in threaded view
|

Re: How can I optimize joins or cache misses in SQL api?

Danny Chan-2
Hi, Marco Villalobos ~

It's nice to see that you choose the SQL API which is more concise and expressive.

To answer some of your questions:

> Q: Is there a way to control that? I don't want the N + 1 query problem.

No, the SQL evaluate row by row, there maybe some optimizations internal that buffer the data first, but there is no logic to combine the ad-hoc query into one IN.

> Q: Is there a way to preload persons table, since it changes only
about once every two weeks and then do a LEFT JOIN on it?

Yes, the temporal table have a configuration to cache the data, but by default, this feature is disabled. [1]

> Q: Is there a way to control a shorter cache time for non-existent items?

You can configure the state TTL of stream-stream join through [2] or modify the temporal cache TTL through the options above.


Marco Villalobos <[hidden email]> 于2020年12月8日周二 下午11:51写道:
scenario:

kafka stream enriched with tableS in postgresql

Let's pretend that the postgres has an organizations, departments, and
persons table, and we want to join the full name of the kafka table
that has the person id.  I also want to determine if the person id is
missing.

This requires a left join.

SELECT o.id, d.id, p.fullname, k.ssn, SUM(k.amount)
FROM purchases k
JOIN organizations o ON o.code = k.organization
JOIN departmentS d ON d.code = k.department
LEFT JOIN persons FOR SYSTEM_TIME AS OF k.procTime AS p ON
p.department_id = d.id
WHERE p.ssn = k.ssn
GROUP BY
TUMBLE(s.procTime, INTERVAL '3' MINUTE), o.id, d.id, p.fullname, k.ssn

Let's say that the TTL for organizations and departments is 12 months,
but for persons
it is 1 month.

observations:

If six unique people enter the kafka topic, then that will issue six
separate queries to the database of the form:

SELECT id, ssn, fullname, dob FROM persons WHERE.deparment_id = $1 AND ssn = $2

However, since this is a tumbling, it would be more efficient to do
one query with six parameters in an IN clause.  Example:

SELECT id, ssn, fullname, dob FROM persons WHERE.(deparment_id, ssn)
IN (($1,$2), ($3,$4),($5,$6),($7,$8),($9,$10)($11,$12))

Q: Is there a way to control that? I don't want the N + 1 query problem.

Q: Are these queries performed asynchronously?  If there were 200000
unique persons, I would not want 20000 synchronous queries.

Q: Is there a way to preload persons table, since it changes only
about once every two weeks and then do a LEFT JOIN on it?

Let's say that the person does not exist. I am impressed that Flink
caches that a person does not exist.  However, I want to cache if a
person exists for a month, but if the person does not exist, I only
want to remember that for a day.

Q: Is there a way to control a shorter cache time for non-existent items?

I really like the expressiveness and succinctness of the SQL api in
Flink, however, I am worried that I need use the data-stream API in
order to control the scenarios above.

I appreciate any advice, thank you.