Hi!
While working with grouping and windowing I encountered a strange behavior. I'm doing: dataStream.groupBy(KeySelector).window(Time.of(x, TimeUnit.SECONDS)).mapWindow(toString).flatten() When I run the program containing this snippet it initially outputs data at a rate around 150 events per sec. (That is roughly the input rate for the program). After about 10-30 minutes the rate drops down below 5 events per sec. This leads to event delivery offsets getting bigger and bigger ... Any explanation for this? I know you are reworking the streaming API. But it would be useful to know, why this happens ... Cheers. Rico.
|
Hej, This sounds like it could be a garbage collection problem. Do you instantiate any classes inside any of the operators (e.g. in the KeySelector). You can also try to run it locally and use something like jstat to rule this out. cheers Martin On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <[hidden email]> wrote:
|
Hi! I also think it's a GC problem. In the KeySelector I don't instantiate any object. It's a simple toString method call. In the mapWindow I create new objects. But I'm doing the same in other map operators, too. They don't slow down the execution. Only with this construct the execution is slowed down. I watched on the memory footprint of my program. Once with the code construct I wrote and once without. The memory characteristic were the same. The CPU usage also ... I don't have an explanation. But I don't think it comes from my operator functions ... Cheers Rico.
|
The webinterface of Flink has a tab for the TaskManagers. There, you can also see how much time the JVM spend with garbage collection. Can you check whether the number of GC calls + the time spend goes up after 30 minutes? On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann <[hidden email]> wrote:
|
Where can I find these information? I can see the memory usage and cpu load. But where are the information on the GC?
|
It is in the "Information" column: http://i.imgur.com/rzxxURR.png In the screenshot, the two GCs only spend 84 and 25 ms. On Tue, Sep 8, 2015 at 10:34 AM, Rico Bergmann <[hidden email]> wrote:
|
The marksweep value is very high, the scavenge very low. If this helps ;-)
|
I also see in the TM overview the CPU load is still around 25% although there is no input to the program since minutes. The CPU load is degrading very slowly. The memory consumption is still fluctuating at a high level. It does not degrade. In my test I generated test input for 1 minute. Now 10 minutes are over ... I think there must be something with flink...
|
Hi Rico, I have a suspicion. What is the distribution of your keys? That is, are there many unique keys, do the keys keep evolving, i.e. is it always new and different keys? Cheers, Aljoscha On Tue, 8 Sep 2015 at 13:44 Rico Bergmann <[hidden email]> wrote:
|
Yes. The keys are constantly changing. Indeed each unique event has its own key (the event itself). The purpose was to do an event deduplication ...
|
Ok, that's a special case but the system still shouldn't behave that way. The problem is that the grouped discretizer that is responsible for grouping the elements into grouped windows is keeping state for every key that it encounters. And that state is never released, ever. That's the reason for the hight memory consumption and GC load. On Wed, 9 Sep 2015 at 07:01 Rico Bergmann <[hidden email]> wrote:
|
Aljoscha and me are currently working on an alternative Windowing implementation. That new implementation will support out-of-order event time and release keys properly. We will hopefully have a first version to try out in a week or so... Greetings, Stephan On Wed, Sep 9, 2015 at 9:08 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Btw, there was a discussion about this problem a while back:
https://mail-archives.apache.org/mod_mbox/flink-dev/201506.mbox/%3CCADXjeyCi9_OPRO4oQtzhvi-gifeK6_66YbtJZ_PB0aQOp_NijA@...%3E And here is the jira: https://issues.apache.org/jira/browse/FLINK-2181 Best, Gabor 2015-09-09 10:06 GMT+02:00 Stephan Ewen <[hidden email]>: > Aljoscha and me are currently working on an alternative Windowing > implementation. That new implementation will support out-of-order event time > and release keys properly. We will hopefully have a first version to try out > in a week or so... > > Greetings, > Stephan > > > On Wed, Sep 9, 2015 at 9:08 AM, Aljoscha Krettek <[hidden email]> > wrote: >> >> Ok, that's a special case but the system still shouldn't behave that way. >> The problem is that the grouped discretizer that is responsible for grouping >> the elements into grouped windows is keeping state for every key that it >> encounters. And that state is never released, ever. That's the reason for >> the hight memory consumption and GC load. >> >> On Wed, 9 Sep 2015 at 07:01 Rico Bergmann <[hidden email]> wrote: >>> >>> Yes. The keys are constantly changing. Indeed each unique event has its >>> own key (the event itself). The purpose was to do an event deduplication ... >>> >>> >>> >>> Am 08.09.2015 um 20:05 schrieb Aljoscha Krettek <[hidden email]>: >>> >>> Hi Rico, >>> I have a suspicion. What is the distribution of your keys? That is, are >>> there many unique keys, do the keys keep evolving, i.e. is it always new and >>> different keys? >>> >>> Cheers, >>> Aljoscha >>> >>> On Tue, 8 Sep 2015 at 13:44 Rico Bergmann <[hidden email]> wrote: >>>> >>>> I also see in the TM overview the CPU load is still around 25% although >>>> there is no input to the program since minutes. The CPU load is degrading >>>> very slowly. >>>> >>>> The memory consumption is still fluctuating at a high level. It does not >>>> degrade. >>>> >>>> In my test I generated test input for 1 minute. Now 10 minutes are over >>>> ... >>>> >>>> I think there must be something with flink... >>>> >>>> >>>> >>>> Am 08.09.2015 um 13:32 schrieb Rico Bergmann <[hidden email]>: >>>> >>>> The marksweep value is very high, the scavenge very low. If this helps >>>> ;-) >>>> >>>> >>>> >>>> >>>> Am 08.09.2015 um 11:27 schrieb Robert Metzger <[hidden email]>: >>>> >>>> It is in the "Information" column: http://i.imgur.com/rzxxURR.png >>>> In the screenshot, the two GCs only spend 84 and 25 ms. >>>> >>>> On Tue, Sep 8, 2015 at 10:34 AM, Rico Bergmann <[hidden email]> >>>> wrote: >>>>> >>>>> Where can I find these information? I can see the memory usage and cpu >>>>> load. But where are the information on the GC? >>>>> >>>>> >>>>> >>>>> Am 08.09.2015 um 09:34 schrieb Robert Metzger <[hidden email]>: >>>>> >>>>> The webinterface of Flink has a tab for the TaskManagers. There, you >>>>> can also see how much time the JVM spend with garbage collection. >>>>> Can you check whether the number of GC calls + the time spend goes up >>>>> after 30 minutes? >>>>> >>>>> On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann <[hidden email]> >>>>> wrote: >>>>>> >>>>>> Hi! >>>>>> >>>>>> I also think it's a GC problem. In the KeySelector I don't instantiate >>>>>> any object. It's a simple toString method call. >>>>>> In the mapWindow I create new objects. But I'm doing the same in other >>>>>> map operators, too. They don't slow down the execution. Only with this >>>>>> construct the execution is slowed down. >>>>>> >>>>>> I watched on the memory footprint of my program. Once with the code >>>>>> construct I wrote and once without. The memory characteristic were the same. >>>>>> The CPU usage also ... >>>>>> >>>>>> I don't have an explanation. But I don't think it comes from my >>>>>> operator functions ... >>>>>> >>>>>> Cheers Rico. >>>>>> >>>>>> >>>>>> >>>>>> Am 07.09.2015 um 22:43 schrieb Martin Neumann <[hidden email]>: >>>>>> >>>>>> Hej, >>>>>> >>>>>> This sounds like it could be a garbage collection problem. Do you >>>>>> instantiate any classes inside any of the operators (e.g. in the >>>>>> KeySelector). You can also try to run it locally and use something like >>>>>> jstat to rule this out. >>>>>> >>>>>> cheers Martin >>>>>> >>>>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <[hidden email]> >>>>>> wrote: >>>>>>> >>>>>>> Hi! >>>>>>> >>>>>>> While working with grouping and windowing I encountered a strange >>>>>>> behavior. I'm doing: >>>>>>> >>>>>>> dataStream.groupBy(KeySelector).window(Time.of(x, >>>>>>> TimeUnit.SECONDS)).mapWindow(toString).flatten() >>>>>>> >>>>>>> >>>>>>> When I run the program containing this snippet it initially outputs >>>>>>> data at a rate around 150 events per sec. (That is roughly the input rate >>>>>>> for the program). After about 10-30 minutes the rate drops down below 5 >>>>>>> events per sec. This leads to event delivery offsets getting bigger and >>>>>>> bigger ... >>>>>>> >>>>>>> Any explanation for this? I know you are reworking the streaming API. >>>>>>> But it would be useful to know, why this happens ... >>>>>>> >>>>>>> Cheers. Rico. >>>>>> >>>>>> >>>>> >>>> > |
In reply to this post by Rico Bergmann
Hi Rico! We have finished the first part of the Window API reworks. You can find the code here: https://github.com/apache/flink/pull/1175 It should fix the issues and offer vastly improved performance (up to 50x faster). For now, it supports time windows, but we will support the other cases in the next days. I'll ping you once it is merged, I'd be curious if it fixes your issue. Sorry that you ran into this problem... Greetings, Stephan On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <[hidden email]> wrote:
|
Hi! Sounds great. How can I get the source code before it's merged to the master branch? Unfortunately I only have 2 days left for trying this out ... Greets. Rico.
|
Hi Rico, you should be able to get it with these steps: git clone https://github.com/StephanEwen/incubator-flink.git flink cd flink git checkout -t origin/windows This will get you on Stephan's windowing branch. Then you can do a mvn clean install -DskipTests to build it. I will merge his stuff later today, then you should also be able to use it by running the 0.10-SNAPSHOT version. Cheers, Aljoscha On Thu, 24 Sep 2015 at 09:11 Rico Bergmann <[hidden email]> wrote:
|
I took a first glance. I ran 2 test setups. One with a limited test data generator, the outputs around 200 events per second. In this setting the new implementation keeps up with the incoming message rate. The other setup had an unlimited generation (at highest possible rate). There the same problem as before can be observed. After 2 minutes runtime the output of my program is more than a minute behind ... And increasing over time. But I don't know whether this could be a setup problem. I noticed the os load of my testsystem was around 90%. So it might be more a setup problem ... Thanks for your support so far. Cheers. Rico.
|
And as side note: The problem with duplicates seems also to be solved! Cheers Rico.
|
Hi Rico, are you generating the data directly in your flink program or some external queue, such as Kafka? Cheers, Aljoscha On Thu, 24 Sep 2015 at 13:47 Rico Bergmann <[hidden email]> wrote:
|
Hi Rico! When you say that the program falls behind the unlimited generating source, I assume you have some unbounded buffering channel (like Kafka) between the generator and the Flink job. Is that correct? Flink itself backpressures to the sources, but if the source is Kafka, this does of course not affect the Kafka data producer. In that case, you probably "underprovisioned" the streaming job for the data rate. The new windowing should have much better throughput, but it may not be high enough for the data rate, which means you probably need more cores. It may be worth checking other aspects of the program. Depending on what types you use, serialization can be expensive (especially for types like JSON). Also, please make sure you start the system in streaming mode ("start-cluster-streaming.sh" rather than "start-cluster.sh") - that makes a difference in memory behavior for streaming jobs. Greetings, Stephan On Thu, Sep 24, 2015 at 2:53 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |