heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

Steven Wu
Hi ,

I was using Chaos Monkey to test Flink's behavior against frequent killing of task manager nodes. I found that stopped/disposed StreamTask got retained by java finalizer. It is kind like a memory leak. Since each StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.

Inline image 1

finalize() is generally not recommended for cleanup, because "Finalizers are unpredictable, often dangerous, and generally unnecessary", quoted from Joshua Bloch's book.

This code from StreamTask.java seems to be the cause. Is it necessary? can it be removed? We are using flink-1.2 release branch. But I see the same code in flink-1.3 and master branch

/**
* The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
* shutdown method was never called.
*
* <p>
* This should not be relied upon! It will cause shutdown to happen much later than if manual
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
if (!timerService.isTerminated()) {
LOG.info("Timer service is shutting down.");
timerService.shutdownService();
}
}

cancelables.close();
}

Thanks,
Steven
Reply | Threaded
Open this post in threaded view
|

Streaming API has a long delay at the beginning of the process.

Yuta Morisawa
Hi,

I am worrying about the delay of the Streaming API.
My application is that it gets data from kafka-connectors and process
them, then push data to kafka-producers.
The problem is that the app suffers a long delay when the first data
come in the cluster.
It takes about 1000ms to process data (I measure the time with
kafka-timestamp). On the other hand, it works well after 2-3 seconds
first data come in (the delay is about 200ms).

The application is so delay sensitive that I want to solve this problem.
Now, I think this is a matter of JVM but I have no idea to investigate it.
Is there any way to avoid this delay?



Thank you for your attention
Yuta
Reply | Threaded
Open this post in threaded view
|

Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

Fabian Hueske-2
In reply to this post by Steven Wu
Hi Steven,

thanks for reporting this issue.
Looping in Till who's more familiar with the task lifecycles.

Thanks, Fabian

2017-09-12 7:08 GMT+02:00 Steven Wu <[hidden email]>:
Hi ,

I was using Chaos Monkey to test Flink's behavior against frequent killing of task manager nodes. I found that stopped/disposed StreamTask got retained by java finalizer. It is kind like a memory leak. Since each StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.

Inline image 1

finalize() is generally not recommended for cleanup, because "Finalizers are unpredictable, often dangerous, and generally unnecessary", quoted from Joshua Bloch's book.

This code from StreamTask.java seems to be the cause. Is it necessary? can it be removed? We are using flink-1.2 release branch. But I see the same code in flink-1.3 and master branch

/**
* The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
* shutdown method was never called.
*
* <p>
* This should not be relied upon! It will cause shutdown to happen much later than if manual
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
if (!timerService.isTerminated()) {
LOG.info("Timer service is shutting down.");
timerService.shutdownService();
}
}

cancelables.close();
}

Thanks,
Steven

Reply | Threaded
Open this post in threaded view
|

Re: Streaming API has a long delay at the beginning of the process.

Fabian Hueske-2
In reply to this post by Yuta Morisawa
Hi,

If I understand you correctly, the problem is only for the first events that are processed.

AFAIK, Flink lazily instantiates its operators which means that a source task starts to consume records from Kafka before the subsequent tasks have been started.
That's why the latency of the first records is higher.

Not sure if or what can be done about this behavior.
I'll loop in Till who knows more about the lifecycle of tasks.

Best, Fabian


2017-09-12 11:02 GMT+02:00 Yuta Morisawa <[hidden email]>:
Hi,

I am worrying about the delay of the Streaming API.
My application is that it gets data from kafka-connectors and process them, then push data to kafka-producers.
The problem is that the app suffers a long delay when the first data come in the cluster.
It takes about 1000ms to process data (I measure the time with kafka-timestamp). On the other hand, it works well after 2-3 seconds first data come in (the delay is about 200ms).

The application is so delay sensitive that I want to solve this problem.
Now, I think this is a matter of JVM but I have no idea to investigate it.
Is there any way to avoid this delay?



Thank you for your attention
Yuta

Reply | Threaded
Open this post in threaded view
|

Re: Streaming API has a long delay at the beginning of the process.

Yuta Morisawa
Hi, Fabian

 > If I understand you correctly, the problem is only for the first events
 > that are processed.
Yes. More Precisely, first 300 kafka-messages.

 > AFAIK, Flink lazily instantiates its operators which means that a source
 > task starts to consume records from Kafka before the subsequent tasks
 > have been started.
That's a great indication. It describe well the affair.
But, according to the document, it says "The operations are actually
executed when the execution is explicitly triggered by an execute() call
on the execution environment.".
What does it mean?
AFAIK, common Flink programs invoke execute() in main().
Every operators start at this time? I think maybe no.

- Flink Document
 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation


 > Not sure if or what can be done about this behavior.
 > I'll loop in Till who knows more about the lifecycle of tasks.
Thank you very much for your kindness.

Regards, Yuta

On 2017/09/14 19:32, Fabian Hueske wrote:

> Hi,
>
> If I understand you correctly, the problem is only for the first events
> that are processed.
>
> AFAIK, Flink lazily instantiates its operators which means that a source
> task starts to consume records from Kafka before the subsequent tasks
> have been started.
> That's why the latency of the first records is higher.
>
> Not sure if or what can be done about this behavior.
> I'll loop in Till who knows more about the lifecycle of tasks.
>
> Best, Fabian
>
>
> 2017-09-12 11:02 GMT+02:00 Yuta Morisawa <[hidden email]
> <mailto:[hidden email]>>:
>
>     Hi,
>
>     I am worrying about the delay of the Streaming API.
>     My application is that it gets data from kafka-connectors and
>     process them, then push data to kafka-producers.
>     The problem is that the app suffers a long delay when the first data
>     come in the cluster.
>     It takes about 1000ms to process data (I measure the time with
>     kafka-timestamp). On the other hand, it works well after 2-3 seconds
>     first data come in (the delay is about 200ms).
>
>     The application is so delay sensitive that I want to solve this problem.
>     Now, I think this is a matter of JVM but I have no idea to
>     investigate it.
>     Is there any way to avoid this delay?
>
>
>
>     Thank you for your attention
>     Yuta
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Streaming API has a long delay at the beginning of the process.

Fabian Hueske-2
Hi Yuta,

when the execute() method is called, the a so-called JobGraph is constructed from all operators that have been added before by calling map(), keyBy() and so on.
The JobGraph is then submitted to the JobManager which is the master process in Flink. Based on the JobGraph, the master deploys tasks to the worker processes (TaskManagers).
These are the tasks that do the actual processing and they are subsequently started as I explained before, i.e., the source task starts consuming from Kafka before subsequent tasks have been started.

So, there is quite a lot happening when you call execute() including network communication and task deployment.

Hope this helps,
Fabian

2017-09-15 4:25 GMT+02:00 Yuta Morisawa <[hidden email]>:
Hi, Fabian

> If I understand you correctly, the problem is only for the first events
> that are processed.
Yes. More Precisely, first 300 kafka-messages.

> AFAIK, Flink lazily instantiates its operators which means that a source
> task starts to consume records from Kafka before the subsequent tasks
> have been started.
That's a great indication. It describe well the affair.
But, according to the document, it says "The operations are actually executed when the execution is explicitly triggered by an execute() call on the execution environment.".
What does it mean?
AFAIK, common Flink programs invoke execute() in main().
Every operators start at this time? I think maybe no.

- Flink Document

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation


> Not sure if or what can be done about this behavior.
> I'll loop in Till who knows more about the lifecycle of tasks.
Thank you very much for your kindness.

Regards, Yuta

On 2017/09/14 19:32, Fabian Hueske wrote:
Hi,

If I understand you correctly, the problem is only for the first events that are processed.

AFAIK, Flink lazily instantiates its operators which means that a source task starts to consume records from Kafka before the subsequent tasks have been started.
That's why the latency of the first records is higher.

Not sure if or what can be done about this behavior.
I'll loop in Till who knows more about the lifecycle of tasks.

Best, Fabian


2017-09-12 11:02 GMT+02:00 Yuta Morisawa <[hidden email] <mailto:[hidden email]>>:

    Hi,

    I am worrying about the delay of the Streaming API.
    My application is that it gets data from kafka-connectors and
    process them, then push data to kafka-producers.
    The problem is that the app suffers a long delay when the first data
    come in the cluster.
    It takes about 1000ms to process data (I measure the time with
    kafka-timestamp). On the other hand, it works well after 2-3 seconds
    first data come in (the delay is about 200ms).

    The application is so delay sensitive that I want to solve this problem.
    Now, I think this is a matter of JVM but I have no idea to
    investigate it.
    Is there any way to avoid this delay?



    Thank you for your attention
    Yuta



Reply | Threaded
Open this post in threaded view
|

Re: Streaming API has a long delay at the beginning of the process.

Yuta Morisawa
Hi Fabian,

Thank you for your description.

This is my understanding.
1, At the exact time execute() method called, Flink creates JobGraph,
submit it to JobManager, deploy tasks to TaskManagers and DOES NOT
execute each operators.
2, Operators are executed when they needed.
3, Sources(kafka-connectors) starts before operators.
4, The first time operators are called or after GC removes operators'
instance, a kind of initialization occurs, such as classloading,
instantiation, memory allocation and so on. It may costs much time.

If there is any misunderstanding, please comment it.
If not, my question is solved.

Regards.
Yuta

On 2017/09/15 17:05, Fabian Hueske wrote:

> Hi Yuta,
>
> when the execute() method is called, the a so-called JobGraph is
> constructed from all operators that have been added before by calling
> map(), keyBy() and so on.
> The JobGraph is then submitted to the JobManager which is the master
> process in Flink. Based on the JobGraph, the master deploys tasks to the
> worker processes (TaskManagers).
> These are the tasks that do the actual processing and they are
> subsequently started as I explained before, i.e., the source task starts
> consuming from Kafka before subsequent tasks have been started.
>
> So, there is quite a lot happening when you call execute() including
> network communication and task deployment.
>
> Hope this helps,
> Fabian
>
> 2017-09-15 4:25 GMT+02:00 Yuta Morisawa <[hidden email]
> <mailto:[hidden email]>>:
>
>     Hi, Fabian
>
>     > If I understand you correctly, the problem is only for the first events
>     > that are processed.
>     Yes. More Precisely, first 300 kafka-messages.
>
>     > AFAIK, Flink lazily instantiates its operators which means that a source
>     > task starts to consume records from Kafka before the subsequent tasks
>     > have been started.
>     That's a great indication. It describe well the affair.
>     But, according to the document, it says "The operations are actually
>     executed when the execution is explicitly triggered by an execute()
>     call on the execution environment.".
>     What does it mean?
>     AFAIK, common Flink programs invoke execute() in main().
>     Every operators start at this time? I think maybe no.
>
>     - Flink Document
>
>     https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>
>
>
>     > Not sure if or what can be done about this behavior.
>     > I'll loop in Till who knows more about the lifecycle of tasks.
>     Thank you very much for your kindness.
>
>     Regards, Yuta
>
>     On 2017/09/14 19:32, Fabian Hueske wrote:
>
>         Hi,
>
>         If I understand you correctly, the problem is only for the first
>         events that are processed.
>
>         AFAIK, Flink lazily instantiates its operators which means that
>         a source task starts to consume records from Kafka before the
>         subsequent tasks have been started.
>         That's why the latency of the first records is higher.
>
>         Not sure if or what can be done about this behavior.
>         I'll loop in Till who knows more about the lifecycle of tasks.
>
>         Best, Fabian
>
>
>         2017-09-12 11:02 GMT+02:00 Yuta Morisawa
>         <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:[hidden email]
>         <mailto:[hidden email]>>>:
>
>              Hi,
>
>              I am worrying about the delay of the Streaming API.
>              My application is that it gets data from kafka-connectors and
>              process them, then push data to kafka-producers.
>              The problem is that the app suffers a long delay when the
>         first data
>              come in the cluster.
>              It takes about 1000ms to process data (I measure the time with
>              kafka-timestamp). On the other hand, it works well after
>         2-3 seconds
>              first data come in (the delay is about 200ms).
>
>              The application is so delay sensitive that I want to solve
>         this problem.
>              Now, I think this is a matter of JVM but I have no idea to
>              investigate it.
>              Is there any way to avoid this delay?
>
>
>
>              Thank you for your attention
>              Yuta
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Streaming API has a long delay at the beginning of the process.

Fabian Hueske-2
Hi Yuta,

you got most things right :-)

3) sources (such as Kafka connectors) are also considered operators and start immediately because they are sources.
4) All other operators start when they need to process their first record. Operators are never GC'd (unless a job was cancelled), so the setup cost is a one time thing that only happens when the job is started.

Best, Fabian

2017-09-15 12:43 GMT+02:00 Yuta Morisawa <[hidden email]>:
Hi Fabian,

Thank you for your description.

This is my understanding.
1, At the exact time execute() method called, Flink creates JobGraph, submit it to JobManager, deploy tasks to TaskManagers and DOES NOT execute each operators.
2, Operators are executed when they needed.
3, Sources(kafka-connectors) starts before operators.
4, The first time operators are called or after GC removes operators' instance, a kind of initialization occurs, such as classloading, instantiation, memory allocation and so on. It may costs much time.

If there is any misunderstanding, please comment it.
If not, my question is solved.

Regards.
Yuta

On 2017/09/15 17:05, Fabian Hueske wrote:
Hi Yuta,

when the execute() method is called, the a so-called JobGraph is constructed from all operators that have been added before by calling map(), keyBy() and so on.
The JobGraph is then submitted to the JobManager which is the master process in Flink. Based on the JobGraph, the master deploys tasks to the worker processes (TaskManagers).
These are the tasks that do the actual processing and they are subsequently started as I explained before, i.e., the source task starts consuming from Kafka before subsequent tasks have been started.

So, there is quite a lot happening when you call execute() including network communication and task deployment.

Hope this helps,
Fabian

2017-09-15 4:25 GMT+02:00 Yuta Morisawa <[hidden email] <mailto:[hidden email]>>:


    Hi, Fabian

    > If I understand you correctly, the problem is only for the first events
    > that are processed.
    Yes. More Precisely, first 300 kafka-messages.

    > AFAIK, Flink lazily instantiates its operators which means that a source
    > task starts to consume records from Kafka before the subsequent tasks
    > have been started.
    That's a great indication. It describe well the affair.
    But, according to the document, it says "The operations are actually
    executed when the execution is explicitly triggered by an execute()
    call on the execution environment.".
    What does it mean?
    AFAIK, common Flink programs invoke execute() in main().
    Every operators start at this time? I think maybe no.

    - Flink Document

    https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
    <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>


    > Not sure if or what can be done about this behavior.
    > I'll loop in Till who knows more about the lifecycle of tasks.
    Thank you very much for your kindness.

    Regards, Yuta

    On 2017/09/14 19:32, Fabian Hueske wrote:

        Hi,

        If I understand you correctly, the problem is only for the first
        events that are processed.

        AFAIK, Flink lazily instantiates its operators which means that
        a source task starts to consume records from Kafka before the
        subsequent tasks have been started.
        That's why the latency of the first records is higher.

        Not sure if or what can be done about this behavior.
        I'll loop in Till who knows more about the lifecycle of tasks.

        Best, Fabian


        2017-09-12 11:02 GMT+02:00 Yuta Morisawa
        <[hidden email]
        <mailto:[hidden email]>
        <mailto:[hidden email]
        <mailto:[hidden email]>>>:

             Hi,

             I am worrying about the delay of the Streaming API.
             My application is that it gets data from kafka-connectors and
             process them, then push data to kafka-producers.
             The problem is that the app suffers a long delay when the
        first data
             come in the cluster.
             It takes about 1000ms to process data (I measure the time with
             kafka-timestamp). On the other hand, it works well after
        2-3 seconds
             first data come in (the delay is about 200ms).

             The application is so delay sensitive that I want to solve
        this problem.
             Now, I think this is a matter of JVM but I have no idea to
             investigate it.
             Is there any way to avoid this delay?



             Thank you for your attention
             Yuta




Reply | Threaded
Open this post in threaded view
|

Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

Till Rohrmann
In reply to this post by Fabian Hueske-2
Hi Steven,

the finalize method in StreamTask acts as a safety net in case the services of the StreamTask haven't been properly shut down. In the code, however, it looks as if the TimerService, for example, is always being stopped in the finally block of the invoke method. Thus, it might not be necessary to have the finalize method as a safety net.

How did you kill the TaskManagers? I assume you didn't kill the JVM process because otherwise you wouldn't see the finalizer objects piling up.

I think that you can create a JIRA issue for removing the finalizer method.

Cheers,
Till



On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Hi Steven,

thanks for reporting this issue.
Looping in Till who's more familiar with the task lifecycles.

Thanks, Fabian

2017-09-12 7:08 GMT+02:00 Steven Wu <[hidden email]>:
Hi ,

I was using Chaos Monkey to test Flink's behavior against frequent killing of task manager nodes. I found that stopped/disposed StreamTask got retained by java finalizer. It is kind like a memory leak. Since each StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.

Inline image 1

finalize() is generally not recommended for cleanup, because "Finalizers are unpredictable, often dangerous, and generally unnecessary", quoted from Joshua Bloch's book.

This code from StreamTask.java seems to be the cause. Is it necessary? can it be removed? We are using flink-1.2 release branch. But I see the same code in flink-1.3 and master branch

/**
* The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
* shutdown method was never called.
*
* <p>
* This should not be relied upon! It will cause shutdown to happen much later than if manual
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
if (!timerService.isTerminated()) {
LOG.info("Timer service is shutting down.");
timerService.shutdownService();
}
}

cancelables.close();
}

Thanks,
Steven


Reply | Threaded
Open this post in threaded view
|

Re: Streaming API has a long delay at the beginning of the process.

Yuta Morisawa
In reply to this post by Fabian Hueske-2
Hi Fabian,

Thanks a lot.
I got a better understanding.

 > Operators are never GC'd (unless a job was cancelled)
That's great information.
Maybe, this is related to so called Managed Memory.
The document will be better if detail documents about Memory Management
exists.

Thank you,
Yuta

On 2017/09/18 18:03, Fabian Hueske wrote:

> Hi Yuta,
>
> you got most things right :-)
>
> 3) sources (such as Kafka connectors) are also considered operators and
> start immediately because they are sources.
> 4) All other operators start when they need to process their first
> record. Operators are never GC'd (unless a job was cancelled), so the
> setup cost is a one time thing that only happens when the job is started.
>
> Best, Fabian
>
> 2017-09-15 12:43 GMT+02:00 Yuta Morisawa <[hidden email]
> <mailto:[hidden email]>>:
>
>     Hi Fabian,
>
>     Thank you for your description.
>
>     This is my understanding.
>     1, At the exact time execute() method called, Flink creates
>     JobGraph, submit it to JobManager, deploy tasks to TaskManagers and
>     DOES NOT execute each operators.
>     2, Operators are executed when they needed.
>     3, Sources(kafka-connectors) starts before operators.
>     4, The first time operators are called or after GC removes
>     operators' instance, a kind of initialization occurs, such as
>     classloading, instantiation, memory allocation and so on. It may
>     costs much time.
>
>     If there is any misunderstanding, please comment it.
>     If not, my question is solved.
>
>     Regards.
>     Yuta
>
>     On 2017/09/15 17:05, Fabian Hueske wrote:
>
>         Hi Yuta,
>
>         when the execute() method is called, the a so-called JobGraph is
>         constructed from all operators that have been added before by
>         calling map(), keyBy() and so on.
>         The JobGraph is then submitted to the JobManager which is the
>         master process in Flink. Based on the JobGraph, the master
>         deploys tasks to the worker processes (TaskManagers).
>         These are the tasks that do the actual processing and they are
>         subsequently started as I explained before, i.e., the source
>         task starts consuming from Kafka before subsequent tasks have
>         been started.
>
>         So, there is quite a lot happening when you call execute()
>         including network communication and task deployment.
>
>         Hope this helps,
>         Fabian
>
>         2017-09-15 4:25 GMT+02:00 Yuta Morisawa
>         <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:[hidden email]
>         <mailto:[hidden email]>>>:
>
>
>              Hi, Fabian
>
>              > If I understand you correctly, the problem is only for
>         the first events
>              > that are processed.
>              Yes. More Precisely, first 300 kafka-messages.
>
>              > AFAIK, Flink lazily instantiates its operators which
>         means that a source
>              > task starts to consume records from Kafka before the
>         subsequent tasks
>              > have been started.
>              That's a great indication. It describe well the affair.
>              But, according to the document, it says "The operations are
>         actually
>              executed when the execution is explicitly triggered by an
>         execute()
>              call on the execution environment.".
>              What does it mean?
>              AFAIK, common Flink programs invoke execute() in main().
>              Every operators start at this time? I think maybe no.
>
>              - Flink Document
>
>         https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
>         <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>
>            
>         <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
>         <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>>
>
>
>              > Not sure if or what can be done about this behavior.
>              > I'll loop in Till who knows more about the lifecycle of
>         tasks.
>              Thank you very much for your kindness.
>
>              Regards, Yuta
>
>              On 2017/09/14 19:32, Fabian Hueske wrote:
>
>                  Hi,
>
>                  If I understand you correctly, the problem is only for
>         the first
>                  events that are processed.
>
>                  AFAIK, Flink lazily instantiates its operators which
>         means that
>                  a source task starts to consume records from Kafka
>         before the
>                  subsequent tasks have been started.
>                  That's why the latency of the first records is higher.
>
>                  Not sure if or what can be done about this behavior.
>                  I'll loop in Till who knows more about the lifecycle of
>         tasks.
>
>                  Best, Fabian
>
>
>                  2017-09-12 11:02 GMT+02:00 Yuta Morisawa
>                  <[hidden email]
>         <mailto:[hidden email]>
>                  <mailto:[hidden email]
>         <mailto:[hidden email]>>
>                  <mailto:[hidden email]
>         <mailto:[hidden email]>
>                  <mailto:[hidden email]
>         <mailto:[hidden email]>>>>:
>
>                       Hi,
>
>                       I am worrying about the delay of the Streaming API.
>                       My application is that it gets data from
>         kafka-connectors and
>                       process them, then push data to kafka-producers.
>                       The problem is that the app suffers a long delay
>         when the
>                  first data
>                       come in the cluster.
>                       It takes about 1000ms to process data (I measure
>         the time with
>                       kafka-timestamp). On the other hand, it works well
>         after
>                  2-3 seconds
>                       first data come in (the delay is about 200ms).
>
>                       The application is so delay sensitive that I want
>         to solve
>                  this problem.
>                       Now, I think this is a matter of JVM but I have no
>         idea to
>                       investigate it.
>                       Is there any way to avoid this delay?
>
>
>
>                       Thank you for your attention
>                       Yuta
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

Stephan Ewen
In reply to this post by Till Rohrmann
Hi!

From my understanding, overriding finalize() still has some use cases and is valid if done correctly, (although PhantomReference has more control over the cleanup process). finalize() is still used in JDK classes as well.

Whenever one overrides finalize(), the object cannot be immediately garbage collected because the finalize() method may make it reachable again. It results in the following life cycle:

  1) object becomes unreachable, is detected eligible for GC
  2) In the GC cycle, object is NOT collected, but finalize() is called
  3) If object is still not reachable, it will be collected in the subsequent GC cycle

In essence, objects that override finalize() stay for one more GC cycle. That may be what you are seeing. It should not be a real memory leak, but deferred memory release.

Is this a problem that is affecting the system, or only something that seems odd for now?

If you are very concerned about this, would you be up to contribute a change that uses a PhantomReference and Reference Queue for cleanup instead?

Stephan


On Tue, Sep 19, 2017 at 12:56 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

the finalize method in StreamTask acts as a safety net in case the services of the StreamTask haven't been properly shut down. In the code, however, it looks as if the TimerService, for example, is always being stopped in the finally block of the invoke method. Thus, it might not be necessary to have the finalize method as a safety net.

How did you kill the TaskManagers? I assume you didn't kill the JVM process because otherwise you wouldn't see the finalizer objects piling up.

I think that you can create a JIRA issue for removing the finalizer method.

Cheers,
Till



On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Hi Steven,

thanks for reporting this issue.
Looping in Till who's more familiar with the task lifecycles.

Thanks, Fabian

2017-09-12 7:08 GMT+02:00 Steven Wu <[hidden email]>:
Hi ,

I was using Chaos Monkey to test Flink's behavior against frequent killing of task manager nodes. I found that stopped/disposed StreamTask got retained by java finalizer. It is kind like a memory leak. Since each StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.

Inline image 1

finalize() is generally not recommended for cleanup, because "Finalizers are unpredictable, often dangerous, and generally unnecessary", quoted from Joshua Bloch's book.

This code from StreamTask.java seems to be the cause. Is it necessary? can it be removed? We are using flink-1.2 release branch. But I see the same code in flink-1.3 and master branch

/**
* The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
* shutdown method was never called.
*
* <p>
* This should not be relied upon! It will cause shutdown to happen much later than if manual
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
if (!timerService.isTerminated()) {
LOG.info("Timer service is shutting down.");
timerService.shutdownService();
}
}

cancelables.close();
}

Thanks,
Steven



Reply | Threaded
Open this post in threaded view
|

Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

Steven Wu
Stephan, agree that it is not a real memory leak. I haven't found it affecting the system. so it is sth odd for now.

but if it is not really necessary, why do we want to defer memory release with unpredictable behavior? can StreamTask stop() method take care of the cleanup work and don't need to rely on finalizer() or PhantomReference?

On Tue, Sep 19, 2017 at 2:56 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

From my understanding, overriding finalize() still has some use cases and is valid if done correctly, (although PhantomReference has more control over the cleanup process). finalize() is still used in JDK classes as well.

Whenever one overrides finalize(), the object cannot be immediately garbage collected because the finalize() method may make it reachable again. It results in the following life cycle:

  1) object becomes unreachable, is detected eligible for GC
  2) In the GC cycle, object is NOT collected, but finalize() is called
  3) If object is still not reachable, it will be collected in the subsequent GC cycle

In essence, objects that override finalize() stay for one more GC cycle. That may be what you are seeing. It should not be a real memory leak, but deferred memory release.

Is this a problem that is affecting the system, or only something that seems odd for now?

If you are very concerned about this, would you be up to contribute a change that uses a PhantomReference and Reference Queue for cleanup instead?

Stephan


On Tue, Sep 19, 2017 at 12:56 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

the finalize method in StreamTask acts as a safety net in case the services of the StreamTask haven't been properly shut down. In the code, however, it looks as if the TimerService, for example, is always being stopped in the finally block of the invoke method. Thus, it might not be necessary to have the finalize method as a safety net.

How did you kill the TaskManagers? I assume you didn't kill the JVM process because otherwise you wouldn't see the finalizer objects piling up.

I think that you can create a JIRA issue for removing the finalizer method.

Cheers,
Till



On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Hi Steven,

thanks for reporting this issue.
Looping in Till who's more familiar with the task lifecycles.

Thanks, Fabian

2017-09-12 7:08 GMT+02:00 Steven Wu <[hidden email]>:
Hi ,

I was using Chaos Monkey to test Flink's behavior against frequent killing of task manager nodes. I found that stopped/disposed StreamTask got retained by java finalizer. It is kind like a memory leak. Since each StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.

Inline image 1

finalize() is generally not recommended for cleanup, because "Finalizers are unpredictable, often dangerous, and generally unnecessary", quoted from Joshua Bloch's book.

This code from StreamTask.java seems to be the cause. Is it necessary? can it be removed? We are using flink-1.2 release branch. But I see the same code in flink-1.3 and master branch

/**
* The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
* shutdown method was never called.
*
* <p>
* This should not be relied upon! It will cause shutdown to happen much later than if manual
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
if (!timerService.isTerminated()) {
LOG.info("Timer service is shutting down.");
timerService.shutdownService();
}
}

cancelables.close();
}

Thanks,
Steven




Reply | Threaded
Open this post in threaded view
|

Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

Steven Wu
How did you kill the TaskManagers? I assume you didn't kill the JVM process because otherwise you wouldn't see the finalizer objects piling up.

Till, I configure Chao Monkey to always kill the newest/same TaskManager. So other N-1 TaskManagers stayed up during the whole process. Each of them experience a job restart for each kill. Then I saw the deferred memory cleanup by finalizer. 

On Tue, Sep 19, 2017 at 9:58 AM, Steven Wu <[hidden email]> wrote:
Stephan, agree that it is not a real memory leak. I haven't found it affecting the system. so it is sth odd for now.

but if it is not really necessary, why do we want to defer memory release with unpredictable behavior? can StreamTask stop() method take care of the cleanup work and don't need to rely on finalizer() or PhantomReference?

On Tue, Sep 19, 2017 at 2:56 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

From my understanding, overriding finalize() still has some use cases and is valid if done correctly, (although PhantomReference has more control over the cleanup process). finalize() is still used in JDK classes as well.

Whenever one overrides finalize(), the object cannot be immediately garbage collected because the finalize() method may make it reachable again. It results in the following life cycle:

  1) object becomes unreachable, is detected eligible for GC
  2) In the GC cycle, object is NOT collected, but finalize() is called
  3) If object is still not reachable, it will be collected in the subsequent GC cycle

In essence, objects that override finalize() stay for one more GC cycle. That may be what you are seeing. It should not be a real memory leak, but deferred memory release.

Is this a problem that is affecting the system, or only something that seems odd for now?

If you are very concerned about this, would you be up to contribute a change that uses a PhantomReference and Reference Queue for cleanup instead?

Stephan


On Tue, Sep 19, 2017 at 12:56 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

the finalize method in StreamTask acts as a safety net in case the services of the StreamTask haven't been properly shut down. In the code, however, it looks as if the TimerService, for example, is always being stopped in the finally block of the invoke method. Thus, it might not be necessary to have the finalize method as a safety net.

How did you kill the TaskManagers? I assume you didn't kill the JVM process because otherwise you wouldn't see the finalizer objects piling up.

I think that you can create a JIRA issue for removing the finalizer method.

Cheers,
Till



On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Hi Steven,

thanks for reporting this issue.
Looping in Till who's more familiar with the task lifecycles.

Thanks, Fabian

2017-09-12 7:08 GMT+02:00 Steven Wu <[hidden email]>:
Hi ,

I was using Chaos Monkey to test Flink's behavior against frequent killing of task manager nodes. I found that stopped/disposed StreamTask got retained by java finalizer. It is kind like a memory leak. Since each StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.

Inline image 1

finalize() is generally not recommended for cleanup, because "Finalizers are unpredictable, often dangerous, and generally unnecessary", quoted from Joshua Bloch's book.

This code from StreamTask.java seems to be the cause. Is it necessary? can it be removed? We are using flink-1.2 release branch. But I see the same code in flink-1.3 and master branch

/**
* The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
* shutdown method was never called.
*
* <p>
* This should not be relied upon! It will cause shutdown to happen much later than if manual
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
if (!timerService.isTerminated()) {
LOG.info("Timer service is shutting down.");
timerService.shutdownService();
}
}

cancelables.close();
}

Thanks,
Steven