Options for limiting state size in TableAPI&SQL

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Options for limiting state size in TableAPI&SQL

sofya
Hi, 
I'm trying to investigate a SQL job using a time-windowed join that is exhibiting a large, growing state. The join syntax is most similar to the interval join (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html).

A few questions:
1. Am I correct in understanding that State TTL is generally not applicable for TableAPI&SQL? So we cannot use State TTL to limit state size for a join?

2. It seems that Flink should be able to expire state even without explicit settings based on this: "In TableAPI&SQL and DataStream, the window aggregation and time-windowed join will clear expired state using Timers which is triggered by watermark."  (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html

To clarify: Does the above mean that Flink is expected to detect expired state and clear it without explicit configuration to allow it to do so?

3. I've looked into setting the idle state retention time. From what I can understand, this particular setting is appropriate for my use case.  "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level configuration which will enable state ttl for all non-time-based operator states." (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)

To clarify: Would enabling this setting control state growth? Is this only available for blink planner? Currently we are using the StreamPlanner. Is there any way to ensure that idle state has limited retention for applications using the StreamPlanner?

Thanks ahead,
Sofya
Reply | Threaded
Open this post in threaded view
|

Re: Options for limiting state size in TableAPI&SQL

Danny Chan-2
Hi, Sofya T. Irwin ~

Can you share your case why you need a timed-window join there ?

Now the sql timed window join is not supported yet, and i want to hear your voice if it is necessary to support in SQL.


Sofya T. Irwin <[hidden email]> 于2020年7月30日周四 下午10:44写道:
Hi, 
I'm trying to investigate a SQL job using a time-windowed join that is exhibiting a large, growing state. The join syntax is most similar to the interval join (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html).

A few questions:
1. Am I correct in understanding that State TTL is generally not applicable for TableAPI&SQL? So we cannot use State TTL to limit state size for a join?

2. It seems that Flink should be able to expire state even without explicit settings based on this: "In TableAPI&SQL and DataStream, the window aggregation and time-windowed join will clear expired state using Timers which is triggered by watermark."  (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html

To clarify: Does the above mean that Flink is expected to detect expired state and clear it without explicit configuration to allow it to do so?

3. I've looked into setting the idle state retention time. From what I can understand, this particular setting is appropriate for my use case.  "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level configuration which will enable state ttl for all non-time-based operator states." (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)

To clarify: Would enabling this setting control state growth? Is this only available for blink planner? Currently we are using the StreamPlanner. Is there any way to ensure that idle state has limited retention for applications using the StreamPlanner?

Thanks ahead,
Sofya
Reply | Threaded
Open this post in threaded view
|

Re: Options for limiting state size in TableAPI&SQL

sofya
Hi Danny,

Thank you for your response.
I'm trying to join two streams that are both fairly high volume. My join looks like this:

  SELECT
    A.rowtime as rowtime,
    A.foo,
    B.bar
  FROM A
  LEFT JOIN B
    ON A.foo = B.foo
    AND A.rowtime BETWEEN B.rowtime - INTERVAL  '1' HOUR AND B.rowtime

When I run this SQL, the state size metric looks like a sawtooth that gradually keeps growing. 
Currently I disabled this query because of a concern it could impact other jobs. 

Based on your statement above, the SQL timed window is not supported? 
Is there another way I can make sure that the state only has data that is only more recent?

Thank you,
Sofya

On Thu, Aug 27, 2020 at 10:49 PM Danny Chan <[hidden email]> wrote:
Hi, Sofya T. Irwin ~

Can you share your case why you need a timed-window join there ?

Now the sql timed window join is not supported yet, and i want to hear your voice if it is necessary to support in SQL.


Sofya T. Irwin <[hidden email]> 于2020年7月30日周四 下午10:44写道:
Hi, 
I'm trying to investigate a SQL job using a time-windowed join that is exhibiting a large, growing state. The join syntax is most similar to the interval join (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html).

A few questions:
1. Am I correct in understanding that State TTL is generally not applicable for TableAPI&SQL? So we cannot use State TTL to limit state size for a join?

2. It seems that Flink should be able to expire state even without explicit settings based on this: "In TableAPI&SQL and DataStream, the window aggregation and time-windowed join will clear expired state using Timers which is triggered by watermark."  (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html

To clarify: Does the above mean that Flink is expected to detect expired state and clear it without explicit configuration to allow it to do so?

3. I've looked into setting the idle state retention time. From what I can understand, this particular setting is appropriate for my use case.  "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level configuration which will enable state ttl for all non-time-based operator states." (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)

To clarify: Would enabling this setting control state growth? Is this only available for blink planner? Currently we are using the StreamPlanner. Is there any way to ensure that idle state has limited retention for applications using the StreamPlanner?

Thanks ahead,
Sofya
Reply | Threaded
Open this post in threaded view
|

Re: Options for limiting state size in TableAPI&SQL

Danny Chan
Thanks for the share ~

The query you gave is actually an interval join[1] , a windowed join is two windowed stream join together, see [2].

Theoretically, for interval join, the state would be cleaned periodically based on the watermark and allowed lateness when the range of RHS had been considered “late”.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html

Best,
Danny Chan
在 2020年8月29日 +0800 AM12:59,Sofya T. Irwin <[hidden email]>,写道:
Hi Danny,

Thank you for your response.
I'm trying to join two streams that are both fairly high volume. My join looks like this:

  SELECT
    A.rowtime as rowtime,
    A.foo,
    B.bar
  FROM A
  LEFT JOIN B
    ON A.foo = B.foo
    AND A.rowtime BETWEEN B.rowtime - INTERVAL  '1' HOUR AND B.rowtime

When I run this SQL, the state size metric looks like a sawtooth that gradually keeps growing. 
Currently I disabled this query because of a concern it could impact other jobs. 

Based on your statement above, the SQL timed window is not supported? 
Is there another way I can make sure that the state only has data that is only more recent?

Thank you,
Sofya

On Thu, Aug 27, 2020 at 10:49 PM Danny Chan <[hidden email]> wrote:
Hi, Sofya T. Irwin ~

Can you share your case why you need a timed-window join there ?

Now the sql timed window join is not supported yet, and i want to hear your voice if it is necessary to support in SQL.


Sofya T. Irwin <[hidden email]> 于2020年7月30日周四 下午10:44写道:
Hi, 
I'm trying to investigate a SQL job using a time-windowed join that is exhibiting a large, growing state. The join syntax is most similar to the interval join (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html).

A few questions:
1. Am I correct in understanding that State TTL is generally not applicable for TableAPI&SQL? So we cannot use State TTL to limit state size for a join?

2. It seems that Flink should be able to expire state even without explicit settings based on this: "In TableAPI&SQL and DataStream, the window aggregation and time-windowed join will clear expired state using Timers which is triggered by watermark."  (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html

To clarify: Does the above mean that Flink is expected to detect expired state and clear it without explicit configuration to allow it to do so?

3. I've looked into setting the idle state retention time. From what I can understand, this particular setting is appropriate for my use case.  "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level configuration which will enable state ttl for all non-time-based operator states." (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)

To clarify: Would enabling this setting control state growth? Is this only available for blink planner? Currently we are using the StreamPlanner. Is there any way to ensure that idle state has limited retention for applications using the StreamPlanner?

Thanks ahead,
Sofya