Memory on Aggr

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Memory on Aggr

Alberto Ramón
From "Relational Queries on Data Stream in Apache Flink" > Bounday Memory Requirements
(https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU/edit#)

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

GROUP BY user, page


-Versus-


SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour

GROUP BY user, page


I understand:
  • Not use WaterMark to pre-calculate agrr, and save memory
  • Store all events "as is" until the end of window 

are My assumptions true ?


Reply | Threaded
Open this post in threaded view
|

Re: Memory on Aggr

Fabian Hueske-2
First of all, the document only proposes semantics for Flink's support of relational queries on streams.
It does not describe the implementation and in fact most of it is not implemented.

How the queries will be executed would depend on the definition of the table, i.e., whether the tables are derived in append or replace mode.
For the second query we do not necessarily need to "store all events as is" but could do some pre-aggregation depending on the configured update rate.
Watermarks will be used to track time in a query, i.e., to evaluate a predicate like "BETWEEN now() - INTERVAL '1' HOUR AND now()" where now() would be the current watermark time.

There are a couple of tricks one can play to reduce the memory requirements and the implementation should try to optimize for that.
However, it is true that for some queries we will need to keep the complete input relation (within its time bounds) as state.
The good news is that Flink is very good a managing large state and can easily scale to hundreds of nodes.

Did that answer your questions?

2016-11-07 21:33 GMT+01:00 Alberto Ramón <[hidden email]>:
From "Relational Queries on Data Stream in Apache Flink" > Bounday Memory Requirements
(https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU/edit#)

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

GROUP BY user, page


-Versus-


SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour

GROUP BY user, page


I understand:
  • Not use WaterMark to pre-calculate agrr, and save memory
  • Store all events "as is" until the end of window 

are My assumptions true ?



Reply | Threaded
Open this post in threaded view
|

Re: Memory on Aggr

Alberto Ramón
Yes thanks

Perhaps my example is too simple
 select xx, count(), sum() from ttt group by xx
Why the querie value can't be calculated each 2 secs / waterMark arrive ?

I'm try to find the video of: http://flink-forward.org/kb_sessions/scaling-stream-processing-with-apache-flink-to-very-large-state/

2016-11-07 22:02 GMT+01:00 Fabian Hueske <[hidden email]>:
First of all, the document only proposes semantics for Flink's support of relational queries on streams.
It does not describe the implementation and in fact most of it is not implemented.

How the queries will be executed would depend on the definition of the table, i.e., whether the tables are derived in append or replace mode.
For the second query we do not necessarily need to "store all events as is" but could do some pre-aggregation depending on the configured update rate.
Watermarks will be used to track time in a query, i.e., to evaluate a predicate like "BETWEEN now() - INTERVAL '1' HOUR AND now()" where now() would be the current watermark time.

There are a couple of tricks one can play to reduce the memory requirements and the implementation should try to optimize for that.
However, it is true that for some queries we will need to keep the complete input relation (within its time bounds) as state.
The good news is that Flink is very good a managing large state and can easily scale to hundreds of nodes.

Did that answer your questions?

2016-11-07 21:33 GMT+01:00 Alberto Ramón <[hidden email]>:
From "Relational Queries on Data Stream in Apache Flink" > Bounday Memory Requirements
(https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU/edit#)

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

GROUP BY user, page


-Versus-


SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour

GROUP BY user, page


I understand:
  • Not use WaterMark to pre-calculate agrr, and save memory
  • Store all events "as is" until the end of window 

are My assumptions true ?




Reply | Threaded
Open this post in threaded view
|

Re: Memory on Aggr

Fabian Hueske-2
Given the semantics described in the document the query can be computed in principle.
However, if the query is not bounded by time, the required state might grow very large if the number of distinct xx values grows over time.
That's why we will probably enforce a time predicate or meta data that the value domain of xx is of constant size.



2016-11-08 9:04 GMT+01:00 Alberto Ramón <[hidden email]>:
Yes thanks

Perhaps my example is too simple
 select xx, count(), sum() from ttt group by xx
Why the querie value can't be calculated each 2 secs / waterMark arrive ?

I'm try to find the video of: http://flink-forward.org/kb_sessions/scaling-stream-processing-with-apache-flink-to-very-large-state/

2016-11-07 22:02 GMT+01:00 Fabian Hueske <[hidden email]>:
First of all, the document only proposes semantics for Flink's support of relational queries on streams.
It does not describe the implementation and in fact most of it is not implemented.

How the queries will be executed would depend on the definition of the table, i.e., whether the tables are derived in append or replace mode.
For the second query we do not necessarily need to "store all events as is" but could do some pre-aggregation depending on the configured update rate.
Watermarks will be used to track time in a query, i.e., to evaluate a predicate like "BETWEEN now() - INTERVAL '1' HOUR AND now()" where now() would be the current watermark time.

There are a couple of tricks one can play to reduce the memory requirements and the implementation should try to optimize for that.
However, it is true that for some queries we will need to keep the complete input relation (within its time bounds) as state.
The good news is that Flink is very good a managing large state and can easily scale to hundreds of nodes.

Did that answer your questions?

2016-11-07 21:33 GMT+01:00 Alberto Ramón <[hidden email]>:
From "Relational Queries on Data Stream in Apache Flink" > Bounday Memory Requirements
(https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU/edit#)

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

GROUP BY user, page


-Versus-


SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour

GROUP BY user, page


I understand:
  • Not use WaterMark to pre-calculate agrr, and save memory
  • Store all events "as is" until the end of window 

are My assumptions true ?





Reply | Threaded
Open this post in threaded view
|

Re: Memory on Aggr

Alberto Ramón
thanks ¡¡
Now its clear for me


2016-11-08 9:23 GMT+01:00 Fabian Hueske <[hidden email]>:
Given the semantics described in the document the query can be computed in principle.
However, if the query is not bounded by time, the required state might grow very large if the number of distinct xx values grows over time.
That's why we will probably enforce a time predicate or meta data that the value domain of xx is of constant size.



2016-11-08 9:04 GMT+01:00 Alberto Ramón <[hidden email]>:
Yes thanks

Perhaps my example is too simple
 select xx, count(), sum() from ttt group by xx
Why the querie value can't be calculated each 2 secs / waterMark arrive ?

I'm try to find the video of: http://flink-forward.org/kb_sessions/scaling-stream-processing-with-apache-flink-to-very-large-state/

2016-11-07 22:02 GMT+01:00 Fabian Hueske <[hidden email]>:
First of all, the document only proposes semantics for Flink's support of relational queries on streams.
It does not describe the implementation and in fact most of it is not implemented.

How the queries will be executed would depend on the definition of the table, i.e., whether the tables are derived in append or replace mode.
For the second query we do not necessarily need to "store all events as is" but could do some pre-aggregation depending on the configured update rate.
Watermarks will be used to track time in a query, i.e., to evaluate a predicate like "BETWEEN now() - INTERVAL '1' HOUR AND now()" where now() would be the current watermark time.

There are a couple of tricks one can play to reduce the memory requirements and the implementation should try to optimize for that.
However, it is true that for some queries we will need to keep the complete input relation (within its time bounds) as state.
The good news is that Flink is very good a managing large state and can easily scale to hundreds of nodes.

Did that answer your questions?

2016-11-07 21:33 GMT+01:00 Alberto Ramón <[hidden email]>:
From "Relational Queries on Data Stream in Apache Flink" > Bounday Memory Requirements
(https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU/edit#)

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

GROUP BY user, page


-Versus-


SELECT user, page, COUNT(page) AS pCnt
FROM pageviews

WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour

GROUP BY user, page


I understand:
  • Not use WaterMark to pre-calculate agrr, and save memory
  • Store all events "as is" until the end of window 

are My assumptions true ?