CEP issue

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

CEP issue

kieran .

Hello,

I am currently building a multi-tenant monitoring application and exploring the effectiveness of different Complex Event Processors (CEP) and whether or not this would be a potential solution for what I want to achieve. I have created a small test application which utilises Flink and its CEP but I have come across some issues when dealing with a large number of metrics to monitor when using patterns/pattern streams. Flink seems to operate as expected with one, or several patterns each consuming it's own PatternStream, but as soon as more are introduced the memory usage of Flink seems to rise rather quickly and eventually throw an OutOfMemoryError. My initial idea was to create one pattern/pattern stream for each metric that I need to monitor, but there could be many thousands of these.

I create the PatternStream per Pattern like this to monitor a metric:

  Pattern<MetricData, ?> pattern = Pattern.<MetricData> begin( patternName ).subtype( MetricData.class )

                .where(

                (evt -> evt.getValues().get( "max" ).longValue() > 50.0

                        && evt.account_id.equals( accountName )) );


        check.withPattern( pattern )

                .withTimePeriod( Integer.valueOf( 1 ) )

                .withCooldown( Integer.valueOf( 1 ) )

                .withName( checkName )

                .withAlertStatus( AlertStatus.OK )

                .setPatternStream(CEP.pattern(messageStream.keyBy("account_id"), pattern));


To trigger these patterns, I use

        PatternSelectFunction<MetricData, MetricWarning> psf = new PatternSelectFunction<MetricData, MetricWarning>()

        {

            @Override

            public MetricWarning select( Map<String, MetricData> map ) throws Exception

            {

                return new MetricWarning(map.get(patternKey), name, accountId);

            }


        };


        try

        {

            check.getPatternStream().select(psf);

        }

        catch( Exception exception )

        {

            exception.printStackTrace();

        }



The pattern in the above example is tied to a specific stream which would result in one stream per pattern and this seems to be an issue using this approach. If it would be possible to run one pattern stream and switching out the patterns when needed, then perhaps this would be a viable solution. Am I approaching this in the right way by creating a stream for each pattern? 

Would it be possible to create a set of Pattern processors that could be run against a single PatternStream or is there anything you could suggest which would allow me to do this with Flink?

Thanks,
- Kieran

Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

rmetzger0
Hi Kieran,

which statebackend are you using for your CEP job? Using RocksDB as a state backend could potentially fix the issue.
What's the number of keys in your stream?


On Tue, Nov 29, 2016 at 3:18 PM, kieran . <[hidden email]> wrote:

Hello,

I am currently building a multi-tenant monitoring application and exploring the effectiveness of different Complex Event Processors (CEP) and whether or not this would be a potential solution for what I want to achieve. I have created a small test application which utilises Flink and its CEP but I have come across some issues when dealing with a large number of metrics to monitor when using patterns/pattern streams. Flink seems to operate as expected with one, or several patterns each consuming it's own PatternStream, but as soon as more are introduced the memory usage of Flink seems to rise rather quickly and eventually throw an OutOfMemoryError. My initial idea was to create one pattern/pattern stream for each metric that I need to monitor, but there could be many thousands of these.

I create the PatternStream per Pattern like this to monitor a metric:

  Pattern<MetricData, ?> pattern = Pattern.<MetricData> begin( patternName ).subtype( MetricData.class )

                .where(

                (evt -> evt.getValues().get( "max" ).longValue() > 50.0

                        && evt.account_id.equals( accountName )) );


        check.withPattern( pattern )

                .withTimePeriod( Integer.valueOf( 1 ) )

                .withCooldown( Integer.valueOf( 1 ) )

                .withName( checkName )

                .withAlertStatus( AlertStatus.OK )

                .setPatternStream(CEP.pattern(messageStream.keyBy("account_id"), pattern));


To trigger these patterns, I use

        PatternSelectFunction<MetricData, MetricWarning> psf = new PatternSelectFunction<MetricData, MetricWarning>()

        {

            @Override

            public MetricWarning select( Map<String, MetricData> map ) throws Exception

            {

                return new MetricWarning(map.get(patternKey), name, accountId);

            }


        };


        try

        {

            check.getPatternStream().select(psf);

        }

        catch( Exception exception )

        {

            exception.printStackTrace();

        }



The pattern in the above example is tied to a specific stream which would result in one stream per pattern and this seems to be an issue using this approach. If it would be possible to run one pattern stream and switching out the patterns when needed, then perhaps this would be a viable solution. Am I approaching this in the right way by creating a stream for each pattern? 

Would it be possible to create a set of Pattern processors that could be run against a single PatternStream or is there anything you could suggest which would allow me to do this with Flink?

Thanks,
- Kieran