Dear Community,
I'm really struggling on a co-grouped stream. The workload is the following: val firstStream: DataStream[FirstType] = firstRaw.assignTimestampsAndWatermarks(new MyCustomFirstExtractor(maxOutOfOrder)) val secondStream: DataStream[SecondType] = secondRaw .assignTimestampsAndWatermarks(new MyCustomSecondExtractor(maxOutOfOrder)) .map(new toSecondsStreamMapper()) where both the Extractors extend BoundedOutOfOrdernessTimestampExtractor by overriding the extractTimestamp method and assigning timestamps owned respectively by FirstType and SecondType objects. override def extractTimestamp(first: FirstType): Long = first.timestamp Then I'm calling cogroup as follows val stockDetails = firstStream .coGroup(secondStream) .where(_.id) .equalTo(_.id) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .apply(new MyCogroupFunction()) .uid("myCogroup") .name("My CoGroup") The problem is the CoGroup function is never triggered. I did several tests and I was not able to solve it at all. The first relevant point is that event time can be seriously out-of-order. I can even bump into 0 timestamp. Then I faked also timestamps in order to distribute them in a set of two seconds, five seconds, so forth. These tries didn't change at all the behavior: no one window is raised. Another relevant is: I'm running locally by reading from a pre-loaded kafka topic, then all the events are ridden sequentially at startup. I will give a couple example Workload 1 (faked timestamps) fields (id, timestamp) FirstType(9781783433803 ,1490280129517) FirstType(9781783433803 ,1490280129517) FirstType(9781783433803 ,1490280131191) FirstType(9781783433803 ,1490280131191) FirstType(9781783433803 ,1490280131214) FirstType(9781783433803 ,1490280131214) FirstType(9781783433803 ,1490280131250) FirstType(9781783433803 ,1490280131250) FirstType(9781783433803 ,1490280131294) FirstType(9781783433803 ,1490280131294) FirstType(9781783433803 ,1490280131328) FirstType(9781783433803 ,1490280131328) SecondType(9781783433803,1490280130465) SecondType(9781783433803,1490280131027) SecondType(9781783433803,1490280131051) SecondType(9781783433803,1490280131070) SecondType(9781783433803,1490280131085) SecondType(9781783433803,1490280131103) SecondType(9781783433803,1490280131124) SecondType(9781783433803,1490280131143) SecondType(9781783433803,1490280131158) SecondType(9781783433803,1490280131175) Workload 2 (real case timestamps) > FirstType(9781783433803, 1490172958602) 1> FirstType(9781783433803, ,1490172958611) 1> FirstType(9781783433803, 1490172958611) 1> FirstType(9781783433803, 1490172958620) 1> FirstType(9781783433803, 1490172958620) 1> FirstType(9781783433803 ,1490196171869) 1> FirstType(9781783433803, 1490196171869) SecondType(9781783433803 ,0) SecondType(9781783433803, 0) SecondType(9781783433803, 1488834670490) SecondType(9781783433803, 1489577984143) SecondType(9781783433803, 0) SecondType(9781783433803, 0) SecondType(9781783433803, 0) SecondType(9781783433803, 1488834670490) SecondType(9781783433803, 1489577984143) SecondType(9781783433803, 1489689399726) SecondType(9781783433803, 1489689399726) I confirm that I have healthy incoming streams at the entrance of the coGroup operator. I think I'm likely missing something easy. Any help will be really appreciated. Sincerly, Andrea |
Sorry, I forgot to put the Flink version. 1.1.2
Thanks, Andrea |
In reply to this post by Andrea Spina
Dear community,
I finally solved the issue i was bumped into. Basically the reason of the encountered problem was the behavior of my input: incoming rates were so far different in behavior (really late and scarce presence of second type event in event time). The solution I employed was to assign timestamps and watermarks to the source stream just before splitting it into my first type and second type handled streams. I suppose this solved my problem due to EventTimeTrigger .getCurrentWatermark() method, which I think it returns the minimum watermark between the streams scoped by the TriggerContext. So the window was hanging because of the incoming rate behavior of the second type stream. Hope it could help someone in the future. Cheers, Andrea |
Hi,
thanks for letting us know! And sorry that you didn’t get any response from the community, I myself just got back from vacation so I’m only now catching up on mail. Best, Aljoscha > On 30. Mar 2017, at 18:24, Andrea Spina <[hidden email]> wrote: > > Dear community, > > I finally solved the issue i was bumped into. > Basically the reason of the encountered problem was the behavior of my > input: incoming rates were so far different in behavior (really late and > scarce presence of second type event in event time). > > The solution I employed was to assign timestamps and watermarks to the > source stream just before splitting it into my first type and second type > handled streams. I suppose this solved my problem due to EventTimeTrigger > .getCurrentWatermark() method, which I think it returns the minimum > watermark between the streams scoped by the TriggerContext. So the window > was hanging because of the incoming rate behavior of the second type stream. > > Hope it could help someone in the future. > > Cheers, > > Andrea > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12468.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |