Flink CEP : Doesn't generate output

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP : Doesn't generate output

dhanuka.priyanath
Hi All,

I have used Flink CEP to filter some events and generate some alerts based on certain conditions. But unfortunately doesn't print any result. I have attached source code herewith, could you please help me on this.

--------------------------------


package org.monitoring.stream.analytics;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.shaded.org.apache.commons.lang3.StringUtils;
import org.monitoring.stream.analytics.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MultiMap;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;


public class FlinkCEP {
    private final static Logger LOGGER = LoggerFactory.getLogger(FlinkCEP.class);

    public static void main(String[] args) throws Exception {

String query = FileHandler.readInputStream(FileHandler.getResourceAsStream("query.sql"));
if (query == null) {
    LOGGER.error("*****************  Can't read resources ************************");
} else {
    LOGGER.info("======================== " + query + " =============================");
}
Properties props = FileHandler.loadResourceProperties("application.properties");
Properties kConsumer = FileHandler.loadResourceProperties("consumer.properties");
Properties kProducer = FileHandler.loadResourceProperties("producer.properties");
String hzConfig = FileHandler.readInputStream(FileHandler.getResourceAsStream("hazelcast-client.xml"));
String schemaContent = FileHandler.readInputStream(FileHandler.getResourceAsStream("IRIC-schema.json"));

props.setProperty("auto.offset.reset", "latest");
props.setProperty("flink.starting-position", "latest");
Map<String, String> tempMap = new HashMap<>();
for (final String name : props.stringPropertyNames())
    tempMap.put(name, props.getProperty(name));
final ParameterTool params = ParameterTool.fromMap(tempMap);
String jobName = props.getProperty(ApplicationConfig.JOB_NAME);

LOGGER.info("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Desktop Responsibility Start %%%%%%%%%%%%%%%%%%%%%%");

LOGGER.info("$$$$$$$$$$$$$$$$$$$$$$$ Hz instance name " + props.toString());
HazelcastInstance hzInst = HazelcastUtils.getClient(hzConfig, "");

LOGGER.info("============================== schema " + schemaContent);

MultiMap<String, String> distributedMap = hzInst.getMultiMap("masterDataSynch");
distributedMap.put(jobName, query);

LOGGER.info("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Desktop Responsibility End %%%%%%%%%%%%%%%%%%%%%%%%%");

Collection<String> queries = distributedMap.get(jobName);
Set<String> rules = new HashSet<>(queries);
LOGGER.info("============================== query" + query);
rules.add(query);
hzInst.getLifecycleService().shutdown();
final String sourceTable = "dataTable";

String paral = props.getProperty(ApplicationConfig.FLINK_PARALLEL_TASK);
String noOfOROperatorsValue = props.getProperty(ApplicationConfig.FLINK_NUMBER_OF_OR_OPERATORS);
int noOfOROperators = 50;
if(StringUtils.isNoneBlank(noOfOROperatorsValue)) {
    noOfOROperators = Integer.parseInt(noOfOROperatorsValue);
}
List<List<String>> subQueries = chunk(new ArrayList(rules), noOfOROperators);

// define a schema

// setup streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(300000); // 300 seconds
env.getConfig().setGlobalJobParameters(params);
// env.getConfig().enableObjectReuse();
env.getConfig().setUseSnapshotCompression(true);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.setParallelism(Integer.parseInt(paral));
/* env.setStateBackend(new RocksDBStateBackend(env.getStateBackend(), true)); */

FlinkKafkaConsumer010<Event> kafka = new FlinkKafkaConsumer010<>("testin", new EventDeSerializer(), kConsumer);
final SourceFunction<Event> source = kafka;
DataStream<Event> events = env.addSource(source);

DataStream<Event> filteredEvents = events
// partition on the address to make sure equal addresses
// end up in the same state machine flatMap function
.keyBy(Event::getTiggerID);
filteredEvents.print();
//create pattern
       Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
        .where(new SimpleCondition<Event>() {

    private static final long serialVersionUID = 1L;

    @Override
            public boolean filter(Event event) throws Exception {
              return event.getInterceptID().equals(4508724) && event.getProviderInfo().equals("Dhanuka") ;
              //&& event.getLIID().matches(".*193400835.*");
            }
          })
          .or(new SimpleCondition<Event>() {

    private static final long serialVersionUID = 1L;

    @Override
            public boolean filter(Event event) throws Exception {
        return event.getInterceptID().equals(4508724) && event.getProviderInfo().equals("Dhanuka") ;
        //&& event.getLIID().matches(".*193400835.*");
            }
          });
       
        PatternStream<Event> patternStream = CEP.pattern(filteredEvents, pattern);
        
        DataStream<Alert> result = patternStream.select(new PatternSelectFunction<Event, Alert>() {
            private static final long serialVersionUID = 1L;

    @Override
    public Alert select(Map<String, List<Event>> pattern) throws Exception {
return parseMatch(pattern);
    }

    private Alert parseMatch(Map<String, List<Event>> pattern) {
List<Event> events = pattern.get("first");
Alert alert = null;
try {
    alert =  new Alert(JSONUtils.writeListToJsonArray(events));
} catch (IOException e) {
    LOGGER.error(e.getMessage());
    alert = new Alert("[]");
}
return alert;
    }   

        });
        
        FlinkKafkaProducer010<Alert> producer = new FlinkKafkaProducer010<>("testout", new AlertSerializer(), kProducer);
        result.addSink(producer);

        result.print();
       
env.execute(props.getProperty(ApplicationConfig.JOB_NAME));
System.out.println("======================= Successfully Deployed ========================");
    }
        

    public static class MyTime extends ScalarFunction {

private static final long serialVersionUID = 1L;

public MyTime(long timeMills) {
    super();
}

public String eval(long timeMills) {
    return new Timestamp(timeMills).toString();
}

    }

    public static class Now extends ScalarFunction {

private static final long serialVersionUID = 1L;

public Now() {
    super();
}

public String eval() {
    return new Timestamp(System.currentTimeMillis()).toString();
}

    }

    public static List<List<String>> chunk(final List<String> arrayList, final int chunkSize) {

int rest = arrayList.size() % chunkSize;
int noOfChunks = arrayList.size() / chunkSize;
int start = 0;
int end = 0;
int count = (rest == 0 ? noOfChunks: noOfChunks +1);

System.out.println("rest  " + rest + " noOfChunks " + noOfChunks);
List<List<String>> chunks = new ArrayList<>();
for (int index = 0; index < count; ++index) {
    if (index == 0 && rest > 0) {
end = rest;
    }else {
end = start + chunkSize;
    }

    List<String> sublist = arrayList.subList(start, end);
    start = end;
    chunks.add(sublist);
    //System.out.println(sublist);
}
return chunks;

    }
    
    public static String getOnlyConditions(final String sql) {
int index = sql.indexOf("WHERE");
return sql.substring(index).replace("WHERE", "(") + ")";
    }
}


Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP : Doesn't generate output

Dian Fu
Hi Dhanuka,

From the code you shared, it seems that you're using event time. The processing of elements is triggered by watermark in event time and so you should define how to generate the watermark, i.e with DataStream.assignTimestampsAndWatermarks

Regards,
Dian


在 2019年1月23日,上午1:58,dhanuka ranasinghe <[hidden email]> 写道:

patternStream

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP : Doesn't generate output

dhanuka.priyanath
Thank you for the clarification. 

On Thu, 24 Jan 2019, 12:44 Dian Fu <[hidden email] wrote:
Hi Dhanuka,

From the code you shared, it seems that you're using event time. The processing of elements is triggered by watermark in event time and so you should define how to generate the watermark, i.e with DataStream.assignTimestampsAndWatermarks

Regards,
Dian


在 2019年1月23日,上午1:58,dhanuka ranasinghe <[hidden email]> 写道:

patternStream

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP : Doesn't generate output

Chesnay Schepler
In reply to this post by dhanuka.priyanath
Can you provide us a self-contained reproducing example? (preferably as elementary as possible)

On 22.01.2019 18:58, dhanuka ranasinghe wrote:
Hi All,

I have used Flink CEP to filter some events and generate some alerts based on certain conditions. But unfortunately doesn't print any result. I have attached source code herewith, could you please help me on this.

--------------------------------


package org.monitoring.stream.analytics;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.shaded.org.apache.commons.lang3.StringUtils;
import org.monitoring.stream.analytics.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MultiMap;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;


public class FlinkCEP {
    private final static Logger LOGGER = LoggerFactory.getLogger(FlinkCEP.class);

    public static void main(String[] args) throws Exception {

String query = FileHandler.readInputStream(FileHandler.getResourceAsStream("query.sql"));
if (query == null) {
    LOGGER.error("*****************  Can't read resources ************************");
} else {
    LOGGER.info("======================== " + query + " =============================");
}
Properties props = FileHandler.loadResourceProperties("application.properties");
Properties kConsumer = FileHandler.loadResourceProperties("consumer.properties");
Properties kProducer = FileHandler.loadResourceProperties("producer.properties");
String hzConfig = FileHandler.readInputStream(FileHandler.getResourceAsStream("hazelcast-client.xml"));
String schemaContent = FileHandler.readInputStream(FileHandler.getResourceAsStream("IRIC-schema.json"));

props.setProperty("auto.offset.reset", "latest");
props.setProperty("flink.starting-position", "latest");
Map<String, String> tempMap = new HashMap<>();
for (final String name : props.stringPropertyNames())
    tempMap.put(name, props.getProperty(name));
final ParameterTool params = ParameterTool.fromMap(tempMap);
String jobName = props.getProperty(ApplicationConfig.JOB_NAME);

LOGGER.info("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Desktop Responsibility Start %%%%%%%%%%%%%%%%%%%%%%");

LOGGER.info("$$$$$$$$$$$$$$$$$$$$$$$ Hz instance name " + props.toString());
HazelcastInstance hzInst = HazelcastUtils.getClient(hzConfig, "");

LOGGER.info("============================== schema " + schemaContent);

MultiMap<String, String> distributedMap = hzInst.getMultiMap("masterDataSynch");
distributedMap.put(jobName, query);

LOGGER.info("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Desktop Responsibility End %%%%%%%%%%%%%%%%%%%%%%%%%");

Collection<String> queries = distributedMap.get(jobName);
Set<String> rules = new HashSet<>(queries);
LOGGER.info("============================== query" + query);
rules.add(query);
hzInst.getLifecycleService().shutdown();
final String sourceTable = "dataTable";

String paral = props.getProperty(ApplicationConfig.FLINK_PARALLEL_TASK);
String noOfOROperatorsValue = props.getProperty(ApplicationConfig.FLINK_NUMBER_OF_OR_OPERATORS);
int noOfOROperators = 50;
if(StringUtils.isNoneBlank(noOfOROperatorsValue)) {
    noOfOROperators = Integer.parseInt(noOfOROperatorsValue);
}
List<List<String>> subQueries = chunk(new ArrayList(rules), noOfOROperators);

// define a schema

// setup streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(300000); // 300 seconds
env.getConfig().setGlobalJobParameters(params);
// env.getConfig().enableObjectReuse();
env.getConfig().setUseSnapshotCompression(true);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.setParallelism(Integer.parseInt(paral));
/* env.setStateBackend(new RocksDBStateBackend(env.getStateBackend(), true)); */

FlinkKafkaConsumer010<Event> kafka = new FlinkKafkaConsumer010<>("testin", new EventDeSerializer(), kConsumer);
final SourceFunction<Event> source = kafka;
DataStream<Event> events = env.addSource(source);

DataStream<Event> filteredEvents = events
// partition on the address to make sure equal addresses
// end up in the same state machine flatMap function
.keyBy(Event::getTiggerID);
filteredEvents.print();
//create pattern
       Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
        .where(new SimpleCondition<Event>() {

    private static final long serialVersionUID = 1L;

    @Override
            public boolean filter(Event event) throws Exception {
              return event.getInterceptID().equals(4508724) && event.getProviderInfo().equals("Dhanuka") ;
              //&& event.getLIID().matches(".*193400835.*");
            }
          })
          .or(new SimpleCondition<Event>() {

    private static final long serialVersionUID = 1L;

    @Override
            public boolean filter(Event event) throws Exception {
        return event.getInterceptID().equals(4508724) && event.getProviderInfo().equals("Dhanuka") ;
        //&& event.getLIID().matches(".*193400835.*");
            }
          });
       
        PatternStream<Event> patternStream = CEP.pattern(filteredEvents, pattern);
        
        DataStream<Alert> result = patternStream.select(new PatternSelectFunction<Event, Alert>() {
            private static final long serialVersionUID = 1L;

    @Override
    public Alert select(Map<String, List<Event>> pattern) throws Exception {
return parseMatch(pattern);
    }

    private Alert parseMatch(Map<String, List<Event>> pattern) {
List<Event> events = pattern.get("first");
Alert alert = null;
try {
    alert =  new Alert(JSONUtils.writeListToJsonArray(events));
} catch (IOException e) {
    LOGGER.error(e.getMessage());
    alert = new Alert("[]");
}
return alert;
    }   

        });
        
        FlinkKafkaProducer010<Alert> producer = new FlinkKafkaProducer010<>("testout", new AlertSerializer(), kProducer);
        result.addSink(producer);

        result.print();
       
env.execute(props.getProperty(ApplicationConfig.JOB_NAME));
System.out.println("======================= Successfully Deployed ========================");
    }
        

    public static class MyTime extends ScalarFunction {

private static final long serialVersionUID = 1L;

public MyTime(long timeMills) {
    super();
}

public String eval(long timeMills) {
    return new Timestamp(timeMills).toString();
}

    }

    public static class Now extends ScalarFunction {

private static final long serialVersionUID = 1L;

public Now() {
    super();
}

public String eval() {
    return new Timestamp(System.currentTimeMillis()).toString();
}

    }

    public static List<List<String>> chunk(final List<String> arrayList, final int chunkSize) {

int rest = arrayList.size() % chunkSize;
int noOfChunks = arrayList.size() / chunkSize;
int start = 0;
int end = 0;
int count = (rest == 0 ? noOfChunks: noOfChunks +1);

System.out.println("rest  " + rest + " noOfChunks " + noOfChunks);
List<List<String>> chunks = new ArrayList<>();
for (int index = 0; index < count; ++index) {
    if (index == 0 && rest > 0) {
end = rest;
    }else {
end = start + chunkSize;
    }

    List<String> sublist = arrayList.subList(start, end);
    start = end;
    chunks.add(sublist);
    //System.out.println(sublist);
}
return chunks;

    }
    
    public static String getOnlyConditions(final String sql) {
int index = sql.indexOf("WHERE");
return sql.substring(index).replace("WHERE", "(") + ")";
    }
}


Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.