I want to do join between two kafka topics (Data, Rules) in one Datastream.
In fact the two datastream must have the same id to make the join. Event are the data coming from the sensors Rules contains the rules that we will check with CEP Here is my test but it does not work as data arrives i have to re-run, can anyone help me please ? inputEventStream.join(inputRulesStream) .where(new KeySelector<TrackEvent, String>() { @Override public String getKey(TrackEvent trackEvent) throws Exception { return trackEvent.getImei(); } }).equalTo(new KeySelector<RulesEvent, String>() { @Override public String getKey(RulesEvent rulesEvent) throws Exception { return rulesEvent.getImei(); } }).window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply(new JoinFunction<TrackEvent, RulesEvent, TrackEvent>() { @Override public TrackEvent join(TrackEvent trackEvent, RulesEvent rulesEvent) throws Exception { trackEvent.setRule(rulesEvent); return trackEvent; } }); |
Hi, Here is my test but it does not work as data arrives i have to re-run, can I think you meant to sent some code snippet? Either way, some code snippet would probably help in understanding what you’re trying to achieve :) You mentioned "re-run and no data”, so one thing that I could probably point out now: the Kafka consumer will commit offsets back to Kafka / ZK for the consumer group (“group.id”) you’re currently using. So, if in your tests you simply restarting the job, make sure you’re using different consumer groups if you want to read previous data in the topics. Cheers, Gordon On 28 April 2017 at 12:20:17 AM, tarek26 ([hidden email]) wrote:
|
This post was updated on .
Hi Gordon,
Thank you for your help, maybe i have not explained well. I have two kafka topics (tracking and rules) and I would like to join "tracking" datastream with "rules" datastream as the data arrives in the "tracking" datastream. Here the result that I expect, but without restarting the Job, here I restarted the Job to get this result: Code: public static void main(final String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); final Properties properties = new Properties(); /*----Kafka------*/ properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer09<TrackEvent> consumertracking=new FlinkKafkaConsumer09<TrackEvent>("tracking", new EventDeserializationSchema(), properties); final DataStream<TrackEvent> inputEventStream = env.addSource(consumertracking); // configure Kafka consumer Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port props.setProperty("group.id", "testt2"); props.setProperty("auto.offset.reset", "latest"); // Always read topic from start // create a Kafka consumer FlinkKafkaConsumer09<RulesEvent> consumerRules = new FlinkKafkaConsumer09<RulesEvent>("rules",new RulesDeserializationSchema(),properties); DataStream<RulesEvent> inputRulesStream = env.addSource(consumerRules); inputEventStream.print(); inputRulesStream.print(); DataStream<TrackEvent> js=inputEventStream.join(inputRulesStream) .where(new KeySelector<TrackEvent, String>() { @Override public String getKey(TrackEvent trackEvent) throws Exception { return trackEvent.getImei(); } }).equalTo(new KeySelector<RulesEvent, String>() { @Override public String getKey(RulesEvent rulesEvent) throws Exception { return rulesEvent.getImei(); } }).window(TumblingEventTimeWindows.of(Time.seconds(2))) .apply(new JoinFunction<TrackEvent, RulesEvent, TrackEvent>() { @Override public TrackEvent join(TrackEvent trackEvent, RulesEvent rulesEvent) throws Exception { trackEvent.setRule(rulesEvent); return trackEvent; } }); js.printToErr(); //----Pattern Temperature-------------------------------------------------------------------------------------- Pattern<TrackEvent, ?> temperaturePattern = Pattern.<TrackEvent> begin("first") .subtype(TrackEvent.class).where(new FilterFunction<TrackEvent>() { private static final long serialVersionUID = 1L; public boolean filter(TrackEvent value) { return value.getTemperature() >= value.getRule().getTemp(); } }); DataStream<Alert> patternStreamTemperature = CEP.pattern(js, temperaturePattern) .select(new PatternSelectFunction<TrackEvent, Alert>() { private static final long serialVersionUID = 1L; String alertsMessage=""; public Alert select(Map<String, TrackEvent> event) throws Exception { alertsMessage="Temperature Rise Detected on imei "+event.get("first").getImei(); return new Alert(event.get("first").getImei(),alertsMessage+"]"); } }); //----Sinks----------------------------------------------------------------------------------------------------- patternStreamTemperature.printToErr(); env.execute(); } |
Free forum by Nabble | Edit this page |