CEP memory requirements

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

CEP memory requirements

Elias Levy
I am observing odd memory behavior with the CEP library and I am wondering if it is expected.

If I write a simple local streaming Flink job that reads from a 65MB compressed file of JSON objects, one per line, parses the JSON, performs a filter operation, and then a keyBy, heap usage is stable, staying below 250MB throughout per VisualVM.

But if I create a CEP pattern that matches nothing (Pattern.begin[T]("foo").where( _ => false )) and match it against the stream produced by the last keyBy (CEP.pattern(stream, pattern).select), then memory balloons until the program terminates, steadily growing until 3GB.

The VisualVM memory profiler appears unable to account for that used heap space.  If I add the Live Bytes column I'd get only between 200-100 MB.

Any idea what is going on?

Flink 1.2.  Java 8.

Reply | Threaded
Open this post in threaded view
|

Re: CEP memory requirements

Elias Levy
Looking at the code I gather that 1.2 does not clear the per key NFA state even if there is no state to keep, whereas this appears fixed in the master branch. Yes?

On Thu, May 4, 2017 at 11:25 AM, Elias Levy <[hidden email]> wrote:
I am observing odd memory behavior with the CEP library and I am wondering if it is expected.

If I write a simple local streaming Flink job that reads from a 65MB compressed file of JSON objects, one per line, parses the JSON, performs a filter operation, and then a keyBy, heap usage is stable, staying below 250MB throughout per VisualVM.

But if I create a CEP pattern that matches nothing (Pattern.begin[T]("foo").where( _ => false )) and match it against the stream produced by the last keyBy (CEP.pattern(stream, pattern).select), then memory balloons until the program terminates, steadily growing until 3GB.

The VisualVM memory profiler appears unable to account for that used heap space.  If I add the Live Bytes column I'd get only between 200-100 MB.

Any idea what is going on?

Flink 1.2.  Java 8.


Reply | Threaded
Open this post in threaded view
|

Re: CEP memory requirements

Dawid Wysakowicz
Yes you are right, prior to 1.3.0 the state per key was never cleared. Right now due to FLINK-5174, in master branch, it is stored only if necessary.

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-05-04 22:12 GMT+02:00 Elias Levy <[hidden email]>:
Looking at the code I gather that 1.2 does not clear the per key NFA state even if there is no state to keep, whereas this appears fixed in the master branch. Yes?

On Thu, May 4, 2017 at 11:25 AM, Elias Levy <[hidden email]> wrote:
I am observing odd memory behavior with the CEP library and I am wondering if it is expected.

If I write a simple local streaming Flink job that reads from a 65MB compressed file of JSON objects, one per line, parses the JSON, performs a filter operation, and then a keyBy, heap usage is stable, staying below 250MB throughout per VisualVM.

But if I create a CEP pattern that matches nothing (Pattern.begin[T]("foo").where( _ => false )) and match it against the stream produced by the last keyBy (CEP.pattern(stream, pattern).select), then memory balloons until the program terminates, steadily growing until 3GB.

The VisualVM memory profiler appears unable to account for that used heap space.  If I add the Live Bytes column I'd get only between 200-100 MB.

Any idea what is going on?

Flink 1.2.  Java 8.