Hi
I am trying to port my spark application in flink. In spark i have used below command to join multiple stream : val stream=stream1.join(stream2).join(stream3).join(stream4) As per my understanding flink required window operation because flink don't works on RDD like spark. so i tried below code to port my spark code in flink . but i don't know its a right approach or right way to implement join between multiple stream . val stream_join_1 = stream1.join(stream2).where(_.getField(0).toString()).equalTo(_.getField(0).toString()).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{ (l, r) => (l.getField(0).toString(),(l.getField(1).toString(),r.getField(1).toString())) } val stream_join_2 = stream3.join(stream4).where(_.getField(0).toString()).equalTo(_.getField(0).toString()).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{ (l, r) => (l.getField(0).toString(),(l.getField(1).toString(),r.getField(1).toString())) } val stream = stream_join_1.join(stream_join_2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{ (l, r) => (l._1,(((l._2._1,l._2._2),r._2._1),r._2._2)) } please help me to find out right approach . Regards Prateek |
Hi, I think it is currently the right approach. I hope that we can in the future provide APIs to make such cases more pleasant to implement. Cheers, Aljoscha On Wed, 25 May 2016 at 22:13 prateekarora <[hidden email]> wrote: Hi |
Hi
Thanks for the information . it will be good if in future you provide a API to implement such use cases more pleasantly. Regards Prateek |
In reply to this post by Aljoscha Krettek
In flink release 1.3, can I do this in simple way?
|
Hi,
I’m afraid there is also no simple, built-in feature for doing this in Flink 1.3. Best, Aljoscha > On 27. Jun 2017, at 10:37, yunfan123 <[hidden email]> wrote: > > In flink release 1.3, can I do this in simple way? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-perform-multiple-stream-join-functionality-tp7184p14011.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Flink 1.3? I'm use flink 1.3, how can I do to implement this?
|
What is the case that you’re trying to implement? For joining two streams you can use a CoProcessFunction, i.e.
stream1.connect(stream2).process(new MyCoProcessFunction()) For more than one streams you can tag each of the incoming streams with a tag that specifies the stream that they’re coming from. Then union them and process in a single ProcessFunction and tweeze the elements from the different streams apart based on their tag. In both cases you would keep the data for joining within Flink state. Best, Aljoscha > On 28. Jun 2017, at 04:53, yunfan123 <[hidden email]> wrote: > > Flink 1.3? I'm use flink 1.3, how can I do to implement this? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-perform-multiple-stream-join-functionality-tp7184p14031.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |