I have two kafka topics (tracking and rules) and I would like to join "tracking" datastream with "rules" datastream as the data arrives in the "tracking" datastream.
Here the result that I expect, but without restarting the Job, here I restarted the Job to get this result: Code: public static void main(final String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); final Properties properties = new Properties(); /*----Kafka------*/ properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer09<TrackEvent> consumertracking=new FlinkKafkaConsumer09<TrackEvent>("tracking", new EventDeserializationSchema(), properties); final DataStream<TrackEvent> inputEventStream = env.addSource(consumertracking); // configure Kafka consumer Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port props.setProperty("group.id", "testt2"); props.setProperty("auto.offset.reset", "latest"); // Always read topic from start // create a Kafka consumer FlinkKafkaConsumer09<RulesEvent> consumerRules = new FlinkKafkaConsumer09<RulesEvent>("rules",new RulesDeserializationSchema(),properties); DataStream<RulesEvent> inputRulesStream = env.addSource(consumerRules); inputEventStream.print(); inputRulesStream.print(); DataStream<TrackEvent> js=inputEventStream.join(inputRulesStream) .where(new KeySelector<TrackEvent, String>() { @Override public String getKey(TrackEvent trackEvent) throws Exception { return trackEvent.getImei(); } }).equalTo(new KeySelector<RulesEvent, String>() { @Override public String getKey(RulesEvent rulesEvent) throws Exception { return rulesEvent.getImei(); } }).window(TumblingEventTimeWindows.of(Time.seconds(2))) .apply(new JoinFunction<TrackEvent, RulesEvent, TrackEvent>() { @Override public TrackEvent join(TrackEvent trackEvent, RulesEvent rulesEvent) throws Exception { trackEvent.setRule(rulesEvent); return trackEvent; } }); js.printToErr(); //----Pattern Temperature-------------------------------------------------------------------------------------- Pattern<TrackEvent, ?> temperaturePattern = Pattern.<TrackEvent> begin("first") .subtype(TrackEvent.class).where(new FilterFunction<TrackEvent>() { private static final long serialVersionUID = 1L; public boolean filter(TrackEvent value) { return value.getTemperature() >= value.getRule().getTemp(); } }); DataStream<Alert> patternStreamTemperature = CEP.pattern(js, temperaturePattern) .select(new PatternSelectFunction<TrackEvent, Alert>() { private static final long serialVersionUID = 1L; String alertsMessage=""; public Alert select(Map<String, TrackEvent> event) throws Exception { alertsMessage="Temperature Rise Detected on imei "+event.get("first").getImei(); return new Alert(event.get("first").getImei(),alertsMessage+"]"); } }); //----Sinks----------------------------------------------------------------------------------------------------- patternStreamTemperature.printToErr(); env.execute(); } |
Hi,
Instead of a Join, I would suggest to use a connected FlatMap [1] (or a connected ProcessFunction [2]). The problem with a join is that the rules only “survive” for the length of the window while I suspect that you want them to survive longer than that so that they can be applied to events arriving in the future. Best, Aljoscha
|
Hi Aljoscha,
Thanks for reply. I opted for the first solution: to use a connected FlatMap [1] but how i can simulate the while(key) and the equalTo(key) of a "join" because the function gets individual calls to flatMap1 and flatMap2. Best regards, Tarek, |
In reply to this post by Aljoscha Krettek
Hi,
You get that by having the two input streams keyed on the same key. Either by doing keyBy() on them individually or by using keyBy() on the ConnectedStream. Best, Aljoscha
|
Hi Aljoscha,
I tested ConnectedStream and CoFlatMapFunction as you told me but the result is not as I wait. For the execution: 1) I added 3 rules on "rules" topic (imei: "01","02,"03") 2) Perform 15 events with different imei but i guess i have problem with "keyby" Result : Code : ConnectedStreams<TrackEvent, RulesEvent> connectedStreams = inputEventStream.connect(inputRulesStream).keyBy("imei","imei"); DataStream<Tuple2<TrackEvent, RulesEvent>> ds= connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent, Tuple2<TrackEvent,RulesEvent>>() { Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent, RulesEvent>(); @Override public void flatMap1(TrackEvent trackEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception { t2.f0=trackEvent; collector.collect(t2); // t2=new Tuple2<TrackEvent, RulesEvent>(); } @Override public void flatMap2(RulesEvent rulesEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception { t2.f1 = rulesEvent; //collector.collect(t2); } }); ds.printToErr(); Best, |
Hi Tarek,
This question seems to be a duplicate with your other question “ConnectedStream keyBy issues”, right? I am just asking for clarification. Thanks, Kostas > On May 4, 2017, at 1:41 PM, Tarek khal <[hidden email]> wrote: > > Hi Aljoscha, > > I tested ConnectedStream and CoFlatMapFunction as you told me but the result > is not as I wait. > > > *For the execution:* > > 1) I added 3 rules on "rules" topic (imei: "01","02,"03") > 2) Perform 15 events with different imei but i guess i have problem with > "keyby" > > *Result : * > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n12998/2222222.jpg> > > Code : > > > Best, > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p12998.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi Kostas,
Yes, now is solved by the help of Jason. Best, |
Perfect!
Thanks a lot for the clarification! Kostas > On May 4, 2017, at 4:37 PM, Tarek khal <[hidden email]> wrote: > > Hi Kostas, > > Yes, now is solved by the help of Jason. > > Best, > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p13006.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |