Hello, I'm trying to implement a left outer join of two Kafka streams within a sliding window. So far I have the following code: foos .coGroup(bars) .where(_.baz).equalTo(_.baz) .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, TimeUnit.SECONDS))) .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) => fs.foreach(f => if (bs.isEmpty) o.collect(FooBar(f, None)) else bs.foreach(b => o.collect(FooBar(f, Some(b)))) ) ) However, this results in the pair being emitted from every window slide, regardless of the match. The desired behaviour would be: * emit the the match as soon as it's found, don't emit any more pairs for it, * otherwise, emit the empty match, when the left side element leaves the last of its windows What would be the idiomatic/efficient way to implement such behaviour? Is it possible at all with the coGroup/window mechanism, or some other way is necessary? Alex |
Hi,
I’m afraid there is currently now way to do what you want with the builtin window primitives. Each of the slices of the sliding windows is essentially evaluated independently. Therefore, there cannot be effects in one slice that influence processing of another slice. What you could do is switch to tumbling windows, then each element would only be in one window. That probably won’t fit your use case anymore. The alternative I see to that is to implement everything in a custom operator where you deal with window states and triggering on time yourself. Let me know if you need some pointers about that one. Cheers, Aljoscha > On 26 Jan 2016, at 19:32, Alexander Gryzlov <[hidden email]> wrote: > > Hello, > > I'm trying to implement a left outer join of two Kafka streams within a sliding window. So far I have the following code: > > foos > .coGroup(bars) > .where(_.baz).equalTo(_.baz) > .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, TimeUnit.SECONDS))) > .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) => > fs.foreach(f => > if (bs.isEmpty) > o.collect(FooBar(f, None)) > else > bs.foreach(b => o.collect(FooBar(f, Some(b)))) > ) > ) > > However, this results in the pair being emitted from every window slide, regardless of the match. The desired behaviour would be: > * emit the the match as soon as it's found, don't emit any more pairs for it, > * otherwise, emit the empty match, when the left side element leaves the last of its windows > > What would be the idiomatic/efficient way to implement such behaviour? Is it possible at all with the coGroup/window mechanism, or some other way is necessary? > > Alex |
Hello Aljoscha, Indeed, it seems like I'd need a custom operator. I imagine this involves implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could you provide those pointers please? Alex On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <[hidden email]> wrote: Hi, |
Hi! I think this pull request may be implementing what you are looking for: https://github.com/apache/flink/pull/1527 Stephan On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <[hidden email]> wrote:
|
Hello Stephan, Yes, I've seen this one before, but AFAIU this is a different use-case: they need an inner join with 2 different windows, whereas I'm ok with a single window, but need an outer join with different semantics... Their StreamJoinOperator, however looks roughly fitting, so I'll probably start by hacking it; unless Aljoscha or somebody else with more experience than me has a better idea :) Alex On Wed, Jan 27, 2016 at 10:53 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi,
I think hacking the StreamJoinOperator could work for you. In Flink a StreamOperator is essentially a more low-level version of a FlatMap. It receives elements by the processElement() method and can emit elements using the Output object output which is a souped up Collector that also allows for watermark emission. Also, the StreamOperator has a StreamingRuntimeContext that can be used to register timer callbacks to perform work on specific time intervals or after timeouts. One think to note is that StreamOperators always deal with StreamRecord<T> which is a wrapper for the user type T that also has a timestamp. Cheers, Aljoscha > On 28 Jan 2016, at 14:31, Alexander Gryzlov <[hidden email]> wrote: > > Hello Stephan, > > Yes, I've seen this one before, but AFAIU this is a different use-case: they need an inner join with 2 different windows, whereas I'm ok with a single window, but need an outer join with different semantics... Their StreamJoinOperator, however looks roughly fitting, so I'll probably start by hacking it; unless Aljoscha or somebody else with more experience than me has a better idea :) > > Alex > > On Wed, Jan 27, 2016 at 10:53 PM, Stephan Ewen <[hidden email]> wrote: > Hi! > > I think this pull request may be implementing what you are looking for: https://github.com/apache/flink/pull/1527 > > Stephan > > > On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <[hidden email]> wrote: > Hello Aljoscha, > > Indeed, it seems like I'd need a custom operator. I imagine this involves implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could you provide those pointers please? > > Alex > > On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > I’m afraid there is currently now way to do what you want with the builtin window primitives. Each of the slices of the sliding windows is essentially evaluated independently. Therefore, there cannot be effects in one slice that influence processing of another slice. > > What you could do is switch to tumbling windows, then each element would only be in one window. That probably won’t fit your use case anymore. The alternative I see to that is to implement everything in a custom operator where you deal with window states and triggering on time yourself. Let me know if you need some pointers about that one. > > Cheers, > Aljoscha > > On 26 Jan 2016, at 19:32, Alexander Gryzlov <[hidden email]> wrote: > > > > Hello, > > > > I'm trying to implement a left outer join of two Kafka streams within a sliding window. So far I have the following code: > > > > foos > > .coGroup(bars) > > .where(_.baz).equalTo(_.baz) > > .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, TimeUnit.SECONDS))) > > .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) => > > fs.foreach(f => > > if (bs.isEmpty) > > o.collect(FooBar(f, None)) > > else > > bs.foreach(b => o.collect(FooBar(f, Some(b)))) > > ) > > ) > > > > However, this results in the pair being emitted from every window slide, regardless of the match. The desired behaviour would be: > > * emit the the match as soon as it's found, don't emit any more pairs for it, > > * otherwise, emit the empty match, when the left side element leaves the last of its windows > > > > What would be the idiomatic/efficient way to implement such behaviour? Is it possible at all with the coGroup/window mechanism, or some other way is necessary? > > > > Alex > > > > |
Free forum by Nabble | Edit this page |