How to perform multiple stream join functionality

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to perform multiple stream join functionality

prateekarora
Hi

I am trying to port my spark application in flink.

In spark i have used below command to join multiple stream :
 
   val stream=stream1.join(stream2).join(stream3).join(stream4)


As per my understanding flink required window operation because  flink don't works on RDD like spark.

so i tried below code to port my spark code in flink . but i don't know its a right approach or right way to implement  join between multiple stream .

 val stream_join_1 = stream1.join(stream2).where(_.getField(0).toString()).equalTo(_.getField(0).toString()).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{ (l, r)  =>
               (l.getField(0).toString(),(l.getField(1).toString(),r.getField(1).toString()))
    }

   val stream_join_2 = stream3.join(stream4).where(_.getField(0).toString()).equalTo(_.getField(0).toString()).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{ (l, r)  =>
                (l.getField(0).toString(),(l.getField(1).toString(),r.getField(1).toString()))
    }

   val stream = stream_join_1.join(stream_join_2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{ (l, r)  =>
                (l._1,(((l._2._1,l._2._2),r._2._1),r._2._2))
   }


please help me to find out right approach .


Regards
Prateek
Reply | Threaded
Open this post in threaded view
|

Re: How to perform multiple stream join functionality

Aljoscha Krettek
Hi,
I think it is currently the right approach. I hope that we can in the future provide APIs to make such cases more pleasant to implement.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 22:13 prateekarora <[hidden email]> wrote:
Hi

I am trying to port my spark application in flink.

In spark i have used below command to join multiple stream :

   val stream=stream1.join(stream2).join(stream3).join(stream4)


As per my understanding flink required window operation because  flink don't
works on RDD like spark.

so i tried below code to port my spark code in flink . but i don't know its
a right approach or right way to implement  join between multiple stream .

 val stream_join_1 =
stream1.join(stream2).where(_.getField(0).toString()).equalTo(_.getField(0).toString()).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{
(l, r)  =>

(l.getField(0).toString(),(l.getField(1).toString(),r.getField(1).toString()))
    }

   val stream_join_2 =
stream3.join(stream4).where(_.getField(0).toString()).equalTo(_.getField(0).toString()).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{
(l, r)  =>

(l.getField(0).toString(),(l.getField(1).toString(),r.getField(1).toString()))
    }

   val stream =
stream_join_1.join(stream_join_2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{
(l, r)  =>
                (l._1,(((l._2._1,l._2._2),r._2._1),r._2._2))
   }


please help me to find out right approach .


Regards
Prateek



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-perform-multiple-stream-join-functionality-tp7184.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: How to perform multiple stream join functionality

prateekarora
Hi

Thanks for the information . it will be good if in future you provide a API to implement  such use cases more pleasantly.

Regards
Prateek
Reply | Threaded
Open this post in threaded view
|

Re: How to perform multiple stream join functionality

yunfan123
In reply to this post by Aljoscha Krettek
In flink release 1.3, can I do this in simple way?
Reply | Threaded
Open this post in threaded view
|

Re: How to perform multiple stream join functionality

Aljoscha Krettek
Hi,

I’m afraid there is also no simple, built-in feature for doing this in Flink 1.3.

Best,
Aljoscha

> On 27. Jun 2017, at 10:37, yunfan123 <[hidden email]> wrote:
>
> In flink release 1.3, can I do this in simple way?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-perform-multiple-stream-join-functionality-tp7184p14011.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: How to perform multiple stream join functionality

yunfan123
Flink 1.3? I'm use flink 1.3, how can I do to implement this?
Reply | Threaded
Open this post in threaded view
|

Re: How to perform multiple stream join functionality

Aljoscha Krettek
What is the case that you’re trying to implement? For joining two streams you can use a CoProcessFunction, i.e.

stream1.connect(stream2).process(new MyCoProcessFunction())

For more than one streams you can tag each of the incoming streams with a tag that specifies the stream that they’re coming from. Then union them and process in a single ProcessFunction and tweeze the elements from the different streams apart based on their tag.

In both cases you would keep the data for joining within Flink state.

Best,
Aljoscha

> On 28. Jun 2017, at 04:53, yunfan123 <[hidden email]> wrote:
>
> Flink 1.3? I'm use flink 1.3, how can I do to implement this?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-perform-multiple-stream-join-functionality-tp7184p14031.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.