Join two kafka topics to do CEP

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Join two kafka topics to do CEP

Tarek khal
I want to do join between two kafka topics (Data, Rules) in one Datastream.

In fact the two datastream must have the same id to make the join.
Event are the data coming from the sensors
Rules contains the rules that we will check with CEP
Here is my test but it does not work as data arrives i have to re-run, can anyone help me please ?

       
  inputEventStream.join(inputRulesStream) 
                 .where(new KeySelector<TrackEvent, String>() { 
                     @Override 
                     public String getKey(TrackEvent trackEvent) throws Exception { 

                         return trackEvent.getImei(); 
                     } 
                 }).equalTo(new KeySelector<RulesEvent, String>() { 
                     @Override 
                     public String getKey(RulesEvent rulesEvent) throws Exception { 
                         return rulesEvent.getImei(); 
                     } 
                 }).window(TumblingEventTimeWindows.of(Time.seconds(3))) 
                 .apply(new JoinFunction<TrackEvent, RulesEvent, TrackEvent>() { 
                     @Override 
                     public TrackEvent join(TrackEvent trackEvent, RulesEvent rulesEvent) throws Exception { 
                         trackEvent.setRule(rulesEvent); 
                         return trackEvent; 
                     } 
                 });
Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics to do CEP

Tzu-Li (Gordon) Tai
Hi,

Here is my test but it does not work as data arrives i have to re-run, can 
anyone help me please ? 

I think you meant to sent some code snippet? Either way, some code snippet would probably help in understanding what you’re trying to achieve :)

You mentioned "re-run and no data”, so one thing that I could probably point out now: the Kafka consumer will commit offsets back to Kafka / ZK for the consumer group (“group.id”) you’re currently using. So, if in your tests you simply restarting the job, make sure you’re using different consumer groups if you want to read previous data in the topics.

Cheers,
Gordon


On 28 April 2017 at 12:20:17 AM, tarek26 ([hidden email]) wrote:

I want to do join between two kafka topics (Data, Rules) in one Datastream.

In fact the two datastream must have the same id to make the join.
Event are the data coming from the sensors
Rules contains the rules that we will check with CEP
Here is my test but it does not work as data arrives i have to re-run, can
anyone help me please ?





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-to-do-CEP-tp12865.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics to do CEP

Tarek khal
This post was updated on .
Hi Gordon,

Thank you for your help, maybe i have not explained well.

I have two kafka topics (tracking and rules) and I would like to join "tracking" datastream with "rules" datastream as the data arrives in the "tracking" datastream.


Here the result that I expect, but without restarting the Job, here I restarted the Job to get this result:



Code:


    public static void main(final String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        final Properties properties = new Properties();
                         /*----Kafka------*/
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer09<TrackEvent> consumertracking=new FlinkKafkaConsumer09<TrackEvent>("tracking", new EventDeserializationSchema(), properties);
        final DataStream<TrackEvent> inputEventStream = env.addSource(consumertracking);

        // configure Kafka consumer
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
        props.setProperty("group.id", "testt2");
        props.setProperty("auto.offset.reset", "latest");       // Always read topic from start

        // create a Kafka consumer
        FlinkKafkaConsumer09<RulesEvent> consumerRules = new FlinkKafkaConsumer09<RulesEvent>("rules",new RulesDeserializationSchema(),properties);

        DataStream<RulesEvent> inputRulesStream = env.addSource(consumerRules);

        inputEventStream.print();

        inputRulesStream.print();

        DataStream<TrackEvent> js=inputEventStream.join(inputRulesStream)
                .where(new KeySelector<TrackEvent, String>() {
                    @Override
                    public String getKey(TrackEvent trackEvent) throws Exception {
                        return trackEvent.getImei();
                    }
                }).equalTo(new KeySelector<RulesEvent, String>() {
                    @Override
                    public String getKey(RulesEvent rulesEvent) throws Exception {
                        return rulesEvent.getImei();
                    }
                }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .apply(new JoinFunction<TrackEvent, RulesEvent, TrackEvent>() {
                    @Override
                    public TrackEvent join(TrackEvent trackEvent, RulesEvent rulesEvent) throws Exception {
                        trackEvent.setRule(rulesEvent);
                        return trackEvent;
                    }
                });

        js.printToErr();

        //----Pattern Temperature--------------------------------------------------------------------------------------
        Pattern<TrackEvent, ?> temperaturePattern = Pattern.<TrackEvent> begin("first")
                .subtype(TrackEvent.class).where(new FilterFunction<TrackEvent>() {
                    private static final long serialVersionUID = 1L;

                    public boolean filter(TrackEvent value) {
                        return value.getTemperature() >= value.getRule().getTemp();
                    }
                });
        DataStream<Alert> patternStreamTemperature = CEP.pattern(js, temperaturePattern)
                .select(new PatternSelectFunction<TrackEvent, Alert>() {
                    private static final long serialVersionUID = 1L;
                    String alertsMessage="";

                    public Alert select(Map<String, TrackEvent> event) throws Exception {
                        alertsMessage="Temperature Rise Detected on imei "+event.get("first").getImei();
                        return new Alert(event.get("first").getImei(),alertsMessage+"]");
                    }
                });

        //----Sinks-----------------------------------------------------------------------------------------------------

        patternStreamTemperature.printToErr();



        env.execute();

    }