Join two kafka topics

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Join two kafka topics

Tarek khal
I have two kafka topics (tracking and rules) and I would like to join "tracking" datastream with "rules" datastream as the data arrives in the "tracking" datastream.


Here the result that I expect, but without restarting the Job, here I restarted the Job to get this result:



Code:
public static void main(final String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        final Properties properties = new Properties();
                         /*----Kafka------*/
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer09<TrackEvent> consumertracking=new FlinkKafkaConsumer09<TrackEvent>("tracking", new EventDeserializationSchema(), properties);
        final DataStream<TrackEvent> inputEventStream = env.addSource(consumertracking);

        // configure Kafka consumer
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
        props.setProperty("group.id", "testt2");
        props.setProperty("auto.offset.reset", "latest");       // Always read topic from start

        // create a Kafka consumer
        FlinkKafkaConsumer09<RulesEvent> consumerRules = new FlinkKafkaConsumer09<RulesEvent>("rules",new RulesDeserializationSchema(),properties);

        DataStream<RulesEvent> inputRulesStream = env.addSource(consumerRules);

        inputEventStream.print();

        inputRulesStream.print();

        DataStream<TrackEvent> js=inputEventStream.join(inputRulesStream)
                .where(new KeySelector<TrackEvent, String>() {
                    @Override
                    public String getKey(TrackEvent trackEvent) throws Exception {
                        return trackEvent.getImei();
                    }
                }).equalTo(new KeySelector<RulesEvent, String>() {
                    @Override
                    public String getKey(RulesEvent rulesEvent) throws Exception {
                        return rulesEvent.getImei();
                    }
                }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .apply(new JoinFunction<TrackEvent, RulesEvent, TrackEvent>() {
                    @Override
                    public TrackEvent join(TrackEvent trackEvent, RulesEvent rulesEvent) throws Exception {
                        trackEvent.setRule(rulesEvent);
                        return trackEvent;
                    }
                });

        js.printToErr();

        //----Pattern Temperature--------------------------------------------------------------------------------------
        Pattern<TrackEvent, ?> temperaturePattern = Pattern.<TrackEvent> begin("first")
                .subtype(TrackEvent.class).where(new FilterFunction<TrackEvent>() {
                    private static final long serialVersionUID = 1L;

                    public boolean filter(TrackEvent value) {
                        return value.getTemperature() >= value.getRule().getTemp();
                    }
                });
        DataStream<Alert> patternStreamTemperature = CEP.pattern(js, temperaturePattern)
                .select(new PatternSelectFunction<TrackEvent, Alert>() {
                    private static final long serialVersionUID = 1L;
                    String alertsMessage="";

                    public Alert select(Map<String, TrackEvent> event) throws Exception {
                        alertsMessage="Temperature Rise Detected on imei "+event.get("first").getImei();
                        return new Alert(event.get("first").getImei(),alertsMessage+"]");
                    }
                });

        //----Sinks-----------------------------------------------------------------------------------------------------

        patternStreamTemperature.printToErr();



        env.execute();

    }
Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics

Aljoscha Krettek
Hi,
Instead of a Join, I would suggest to use a connected FlatMap [1] (or a connected ProcessFunction [2]). The problem with a join is that the rules only “survive” for the length of the window while I suspect that you want them to survive longer than that so that they can be applied to events arriving in the future.

Best,
Aljoscha 


On 2. May 2017, at 18:58, Tarek khal <[hidden email]> wrote:

I have two kafka topics (tracking and rules) and I would like to join
"tracking" datastream with "rules" datastream as the data arrives in the
"tracking" datastream.


Here the result that I expect, but without restarting the Job, here I
restarted the Job to get this result:

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n12954/Capture.jpg>

Code:





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics

Tarek khal
Hi Aljoscha,
Thanks for reply.

I opted for the first solution: to use a connected FlatMap [1] but how i can simulate the while(key) and the equalTo(key) of a "join" because the function gets individual calls to flatMap1 and flatMap2.

Best regards,
Tarek,
Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics

Aljoscha Krettek
In reply to this post by Aljoscha Krettek
Hi,
You get that by having the two input streams keyed on the same key. Either by doing keyBy() on them individually or by using keyBy() on the ConnectedStream.

Best,
Aljoscha

On 3. May 2017, at 13:06, Tarek khal-letaief <[hidden email]> wrote:

Hi Aljoscha, 
Thanks for reply. 

I opted for the first solution: to use a connected FlatMap [1] but how i can simulate the while(key) and the equalTo(key) of a "join" because the function gets individual calls to flatMap1 and flatMap2. 

Best regards, 
Tarek, 

2017-05-03 10:18 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
Instead of a Join, I would suggest to use a connected FlatMap [1] (or a connected ProcessFunction [2]). The problem with a join is that the rules only “survive” for the length of the window while I suspect that you want them to survive longer than that so that they can be applied to events arriving in the future.

Best,
Aljoscha 


On 2. May 2017, at 18:58, Tarek khal <[hidden email]> wrote:

I have two kafka topics (tracking and rules) and I would like to join
"tracking" datastream with "rules" datastream as the data arrives in the
"tracking" datastream.


Here the result that I expect, but without restarting the Job, here I
restarted the Job to get this result:

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n12954/Capture.jpg>

Code:





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




--
Tarek KHAL LETAIEF

Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics

Tarek khal
Hi Aljoscha,

I tested ConnectedStream and CoFlatMapFunction as you told me but the result is not as I wait.


For the execution:

1) I added 3 rules on "rules" topic (imei: "01","02,"03")
2) Perform 15 events with different imei but i guess i have problem with "keyby"

Result :



Code :
        ConnectedStreams<TrackEvent, RulesEvent> connectedStreams = inputEventStream.connect(inputRulesStream).keyBy("imei","imei");
        DataStream<Tuple2<TrackEvent, RulesEvent>> ds= connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent, Tuple2<TrackEvent,RulesEvent>>() {
            Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent, RulesEvent>();
            @Override
            public void flatMap1(TrackEvent trackEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception {
                t2.f0=trackEvent;
                collector.collect(t2);
                // t2=new Tuple2<TrackEvent, RulesEvent>();
            }

            @Override
            public void flatMap2(RulesEvent rulesEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception {
                t2.f1 = rulesEvent;
                //collector.collect(t2);
            }
        });
        ds.printToErr();

Best,

Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics

Kostas Kloudas
Hi Tarek,

This question seems to be a duplicate with your other question “ConnectedStream keyBy issues”, right?
I am just asking for clarification.

Thanks,
Kostas

> On May 4, 2017, at 1:41 PM, Tarek khal <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> I tested ConnectedStream and CoFlatMapFunction as you told me but the result
> is not as I wait.
>
>
> *For the execution:*
>
> 1) I added 3 rules on "rules" topic (imei: "01","02,"03")
> 2) Perform 15 events with different imei but i guess i have problem with
> "keyby"
>
> *Result : *
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n12998/2222222.jpg>
>
> Code :
>
>
> Best,
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p12998.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics

Tarek khal
Hi Kostas,

Yes, now is solved by the help of Jason.

Best,
Reply | Threaded
Open this post in threaded view
|

Re: Join two kafka topics

Kostas Kloudas
Perfect!
Thanks a lot for the clarification!

Kostas

> On May 4, 2017, at 4:37 PM, Tarek khal <[hidden email]> wrote:
>
> Hi Kostas,
>
> Yes, now is solved by the help of Jason.
>
> Best,
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p13006.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.