Job crash in job cluster mode

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Job crash in job cluster mode

Tim Eckhardt

Hi there,

 

I have a problem with running a flink job in job cluster mode using flink 1.11.1 (also tried 1.11.2).

The same job is running well using the session cluster mode as well as using flink 1.10.0 in job cluster mode.

 

The job starts running and is running for quite some time but it runs a lot slower than in session cluster mode and crashes after running for about an hour. I can observe in the flink dashboard that the JVM heap is constant at a high level and is getting slowly closer to the limit (4.13GB in my case) which it reaches close to the job crashing.

There is also some G1_Old_Generation garbage collection going on which I cannot observe in session mode as well.

 

GC values after running for about 45min:

 

(Collector, Count, Time)

G1_Young_Generation   1,250  107,937

G1_Old_Generation  322  2,432,362

 

Compared to the GC values of the same job in session cluster mode (after the same runtime):

 

G1_Young_Generation   1,920  20,575

G1_Old_Generation  0  0

 

So my vague guess is that it has to be something memory related maybe configuration wise.

 

To simplify the setup only one jobmanager and one taskmanager is used. The taskmanager has a memory setting of: taskmanager.memory.process.size: 10000m which should be totally fine for the server. The jobmanager has a defined heap_size of 1600m.

 

Maybe somebody has experienced something like this before?

 

Also is there a way to export the currently loaded configuration parameters of the job- and taskmanagers in a cluster? For example I can’t see the current memory process size of the taskmanager in the flink dashboard. Because this way I could compare the running and crashing setups more easily (using docker and environment variables for configuration at the moment which makes it a bit harder to debug).

 

Thanks.


smime.p7s (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Job crash in job cluster mode

Matthias
Hi Tim,
I'm not aware of any memory-related issues being related to the deployment mode used. Have you checked the logs for hints? Additionally, you could try to extract a heap dump. That might help you in analyzing the cause of the memory consumption.

The TaskManager and JobManager are logging the effective memory-related configuration during startup. You can look out for the "Preconfiguration" section in each of the log files to get a drill-down of how much memory is used per memory pool.

Best,
Matthias

On Tue, Nov 10, 2020 at 3:37 PM Tim Eckhardt <[hidden email]> wrote:

Hi there,

 

I have a problem with running a flink job in job cluster mode using flink 1.11.1 (also tried 1.11.2).

The same job is running well using the session cluster mode as well as using flink 1.10.0 in job cluster mode.

 

The job starts running and is running for quite some time but it runs a lot slower than in session cluster mode and crashes after running for about an hour. I can observe in the flink dashboard that the JVM heap is constant at a high level and is getting slowly closer to the limit (4.13GB in my case) which it reaches close to the job crashing.

There is also some G1_Old_Generation garbage collection going on which I cannot observe in session mode as well.

 

GC values after running for about 45min:

 

(Collector, Count, Time)

G1_Young_Generation   1,250  107,937

G1_Old_Generation  322  2,432,362

 

Compared to the GC values of the same job in session cluster mode (after the same runtime):

 

G1_Young_Generation   1,920  20,575

G1_Old_Generation  0  0

 

So my vague guess is that it has to be something memory related maybe configuration wise.

 

To simplify the setup only one jobmanager and one taskmanager is used. The taskmanager has a memory setting of: taskmanager.memory.process.size: 10000m which should be totally fine for the server. The jobmanager has a defined heap_size of 1600m.

 

Maybe somebody has experienced something like this before?

 

Also is there a way to export the currently loaded configuration parameters of the job- and taskmanagers in a cluster? For example I can’t see the current memory process size of the taskmanager in the flink dashboard. Because this way I could compare the running and crashing setups more easily (using docker and environment variables for configuration at the moment which makes it a bit harder to debug).

 

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: Job crash in job cluster mode

rmetzger0
Hey Tim,

what Is your Flink job doing? Is it restarting from time to time?
Is the JobManager crashing, or the TaskManager?

On Tue, Nov 10, 2020 at 6:01 PM Matthias Pohl <[hidden email]> wrote:
Hi Tim,
I'm not aware of any memory-related issues being related to the deployment mode used. Have you checked the logs for hints? Additionally, you could try to extract a heap dump. That might help you in analyzing the cause of the memory consumption.

The TaskManager and JobManager are logging the effective memory-related configuration during startup. You can look out for the "Preconfiguration" section in each of the log files to get a drill-down of how much memory is used per memory pool.

Best,
Matthias

On Tue, Nov 10, 2020 at 3:37 PM Tim Eckhardt <[hidden email]> wrote:

Hi there,

 

I have a problem with running a flink job in job cluster mode using flink 1.11.1 (also tried 1.11.2).

The same job is running well using the session cluster mode as well as using flink 1.10.0 in job cluster mode.

 

The job starts running and is running for quite some time but it runs a lot slower than in session cluster mode and crashes after running for about an hour. I can observe in the flink dashboard that the JVM heap is constant at a high level and is getting slowly closer to the limit (4.13GB in my case) which it reaches close to the job crashing.

There is also some G1_Old_Generation garbage collection going on which I cannot observe in session mode as well.

 

GC values after running for about 45min:

 

(Collector, Count, Time)

G1_Young_Generation   1,250  107,937

G1_Old_Generation  322  2,432,362

 

Compared to the GC values of the same job in session cluster mode (after the same runtime):

 

G1_Young_Generation   1,920  20,575

G1_Old_Generation  0  0

 

So my vague guess is that it has to be something memory related maybe configuration wise.

 

To simplify the setup only one jobmanager and one taskmanager is used. The taskmanager has a memory setting of: taskmanager.memory.process.size: 10000m which should be totally fine for the server. The jobmanager has a defined heap_size of 1600m.

 

Maybe somebody has experienced something like this before?

 

Also is there a way to export the currently loaded configuration parameters of the job- and taskmanagers in a cluster? For example I can’t see the current memory process size of the taskmanager in the flink dashboard. Because this way I could compare the running and crashing setups more easily (using docker and environment variables for configuration at the moment which makes it a bit harder to debug).

 

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: Job crash in job cluster mode

Tim Eckhardt

Hi Robert, hi Matthias,

 

the job is doing some stateful stream processing (reading data from Kafka) and it should run endlessly, so ideally no restarts from time to time.

The TaskManager is the one who is crashing in the end with this kind of exception:


    org.apache.kafka.common.errors.DisconnectException: null
    INFO  org.apache.kafka.clients.FetchSessionHandler                 [] - [Consumer clientId=consumer-flink-consumer-8, groupId=flink-consumer] Error sending fetch request (sessionId=338952654, epoch=1) to node 3: {}.

    Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-2"

 

Thanks for the tip to look into the heap dump, I might be doing this when running the next experiment.

 

Best regards, Tim

 

From: Robert Metzger <[hidden email]>
Date: Thursday, 12. November 2020 at 09:34
To: "[hidden email]" <[hidden email]>
Cc: Tim Eckhardt <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Job crash in job cluster mode

 

Hey Tim,

 

what Is your Flink job doing? Is it restarting from time to time?
Is the JobManager crashing, or the TaskManager?

 

On Tue, Nov 10, 2020 at 6:01 PM Matthias Pohl <[hidden email]> wrote:

Hi Tim,

I'm not aware of any memory-related issues being related to the deployment mode used. Have you checked the logs for hints? Additionally, you could try to extract a heap dump. That might help you in analyzing the cause of the memory consumption.

 

The TaskManager and JobManager are logging the effective memory-related configuration during startup. You can look out for the "Preconfiguration" section in each of the log files to get a drill-down of how much memory is used per memory pool.

 

Best,
Matthias

On Tue, Nov 10, 2020 at 3:37 PM Tim Eckhardt <[hidden email]> wrote:

Hi there,

 

I have a problem with running a flink job in job cluster mode using flink 1.11.1 (also tried 1.11.2).

The same job is running well using the session cluster mode as well as using flink 1.10.0 in job cluster mode.

 

The job starts running and is running for quite some time but it runs a lot slower than in session cluster mode and crashes after running for about an hour. I can observe in the flink dashboard that the JVM heap is constant at a high level and is getting slowly closer to the limit (4.13GB in my case) which it reaches close to the job crashing.

There is also some G1_Old_Generation garbage collection going on which I cannot observe in session mode as well.

 

GC values after running for about 45min:

 

(Collector, Count, Time)

G1_Young_Generation   1,250  107,937

G1_Old_Generation  322  2,432,362

 

Compared to the GC values of the same job in session cluster mode (after the same runtime):

 

G1_Young_Generation   1,920  20,575

G1_Old_Generation  0  0

 

So my vague guess is that it has to be something memory related maybe configuration wise.

 

To simplify the setup only one jobmanager and one taskmanager is used. The taskmanager has a memory setting of: taskmanager.memory.process.size: 10000m which should be totally fine for the server. The jobmanager has a defined heap_size of 1600m.

 

Maybe somebody has experienced something like this before?

 

Also is there a way to export the currently loaded configuration parameters of the job- and taskmanagers in a cluster? For example I can’t see the current memory process size of the taskmanager in the flink dashboard. Because this way I could compare the running and crashing setups more easily (using docker and environment variables for configuration at the moment which makes it a bit harder to debug).

 

Thanks.


smime.p7s (5K) Download Attachment