Constant backpressure on flink job

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Constant backpressure on flink job

Monika Hristova
Hello,
 
We are experiencing regular backpressure (at least once a week). I would like to ask how we can fix it.
 
Currently our configurations are:
vi /usr/lib/flink/conf/flink-conf.yaml
# Settings applied by Cloud Dataproc initialization action
jobmanager.rpc.address: bonusengine-prod-m-0
jobmanager.heap.mb: 4096
jobmanager.rpc.port: 6123
taskmanager.heap.mb: 4096
taskmanager.memory.preallocate: false
taskmanager.numberOfTaskSlots: 8
#taskmanager.network.numberOfBuffers: 21952     # legacy deprecated
taskmanager.network.memory.fraction: 0.9
taskmanager.network.memory.min: 67108864
taskmanager.network.memory.max: 1073741824
taskmanager.memory.segment-size: 65536
parallelism.default: 52
web.port: 8081
web.timeout: 120000
heartbeat.interval: 10000
heartbeat.timeout: 100000
yarn.application-attempts: 10
high-availability: zookeeper
high-availability.zookeeper.quorum: bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181
high-availability.zookeeper.path.root: /flink
#high-availability.zookeeper.storageDir: hdfs:///flink/recovery     # legacy deprecated
high-availability.storageDir: hdfs:///flink/recovery
flink.partition-discovery.interval-millis=60000
fs.hdfs.hadoopconf: /etc/hadoop/conf
state.backend: rocksdb
state.checkpoints.dir: hdfs:///bonusengine/checkpoints/
state.savepoints.dir: hdfs:///bonusengine/savepoints
metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: 127.0.0.1
metrics.reporter.stsd.port: 8125
zookeeper.sasl.disable: true
yarn.reallocate-failed: true
yarn.maximum-failed-containers: 32
web.backpressure.refresh-interval: 60000
web.backpressure.num-samples: 100
web.backpressure.delay-between-samples: 50
 
with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs, 60 GB with yarn configuration(see attached file)
 
We have one yarn session started where 8 jobs are run. Three of them are consuming the same source (kafka) which is causing the backpressure, but only one of them experiences backpressure. The state of the job is 20 something MB and the checkpoint is configured as follows:
checkpointing.interval=300000 # makes sure value in  ms of progress happens between checkpoints checkpointing.pause_between_checkpointing=240000 # checkpoints have to complete within value in ms or are discarded checkpointing.timeout=60000 # allows given number of checkpoints to be in progress at the same time checkpointing.max_concurrent_checkpoints=1 # enables/disables externalized checkpoints which are retained after job cancellation checkpointing.externalized_checkpoints.enabled=true
 
as checkpointing pause was increased and timeout was reduced on multiple occasions as the job kept failing unable to recover from backpressure. RocksDB is configured state backend. The problem keeps reproducing even with one minute timeout. Also I would like to point out that the perfect checkpointing for that job would be 2 min.
I would like to ask what might be the problem or at least how to trace it ?
 
Best Regards,
Monika Hristova


yarn-site.xml (13K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Constant backpressure on flink job

Dawid Wysakowicz-2

Hi Monika,

I would start with identifying the operator that causes backpressure. More information how to monitor backpressure you can find here in the docs[1]. You might also be interested in Seth's (cc'ed) webinar[2], where he also talks how to debug backpressure.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure

[2] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 17:44, Monika Hristova wrote:
Hello,
 
We are experiencing regular backpressure (at least once a week). I would like to ask how we can fix it.
 
Currently our configurations are:
vi /usr/lib/flink/conf/flink-conf.yaml
# Settings applied by Cloud Dataproc initialization action
jobmanager.rpc.address: bonusengine-prod-m-0
jobmanager.heap.mb: 4096
jobmanager.rpc.port: 6123
taskmanager.heap.mb: 4096
taskmanager.memory.preallocate: false
taskmanager.numberOfTaskSlots: 8
#taskmanager.network.numberOfBuffers: 21952     # legacy deprecated
taskmanager.network.memory.fraction: 0.9
taskmanager.network.memory.min: 67108864
taskmanager.network.memory.max: 1073741824
taskmanager.memory.segment-size: 65536
parallelism.default: 52
web.port: 8081
web.timeout: 120000
heartbeat.interval: 10000
heartbeat.timeout: 100000
yarn.application-attempts: 10
high-availability: zookeeper
high-availability.zookeeper.quorum: bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181
high-availability.zookeeper.path.root: /flink
#high-availability.zookeeper.storageDir: hdfs:///flink/recovery     # legacy deprecated
high-availability.storageDir: hdfs:///flink/recovery
flink.partition-discovery.interval-millis=60000
fs.hdfs.hadoopconf: /etc/hadoop/conf
state.backend: rocksdb
state.checkpoints.dir: hdfs:///bonusengine/checkpoints/
state.savepoints.dir: hdfs:///bonusengine/savepoints
metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: 127.0.0.1
metrics.reporter.stsd.port: 8125
zookeeper.sasl.disable: true
yarn.reallocate-failed: true
yarn.maximum-failed-containers: 32
web.backpressure.refresh-interval: 60000
web.backpressure.num-samples: 100
web.backpressure.delay-between-samples: 50
 
with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs, 60 GB with yarn configuration(see attached file)
 
We have one yarn session started where 8 jobs are run. Three of them are consuming the same source (kafka) which is causing the backpressure, but only one of them experiences backpressure. The state of the job is 20 something MB and the checkpoint is configured as follows:
checkpointing.interval=300000 # makes sure value in  ms of progress happens between checkpoints checkpointing.pause_between_checkpointing=240000 # checkpoints have to complete within value in ms or are discarded checkpointing.timeout=60000 # allows given number of checkpoints to be in progress at the same time checkpointing.max_concurrent_checkpoints=1 # enables/disables externalized checkpoints which are retained after job cancellation checkpointing.externalized_checkpoints.enabled=true
 
as checkpointing pause was increased and timeout was reduced on multiple occasions as the job kept failing unable to recover from backpressure. RocksDB is configured state backend. The problem keeps reproducing even with one minute timeout. Also I would like to point out that the perfect checkpointing for that job would be 2 min.
I would like to ask what might be the problem or at least how to trace it ?
 
Best Regards,
Monika Hristova


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Constant backpressure on flink job

Monika Hristova

Hello Dawid,

 

Thank you for your response.

What I actually noticed is that first checkpoint starts failing with exception “Checkpoint expired before completing”. It keeps failing and then (after an hour or so ) the backpressure occurs. This is my job graph(please see attached files JobGraph_1) with marked in red problematic operators. After this the backpressure occurs and operators marked in red are those with high backpressure (JobGraph_2).

 

Best Regards,

Monika

 

 

From: Dawid Wysakowicz <[hidden email]>
Sent: Thursday, April 25, 2019 11:14 AM
To: Monika Hristova <[hidden email]>; [hidden email]
Subject: Re: Constant backpressure on flink job

 

Hi Monika,

I would start with identifying the operator that causes backpressure. More information how to monitor backpressure you can find here in the docs[1]. You might also be interested in Seth's (cc'ed) webinar[2], where he also talks how to debug backpressure.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure

[2] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 17:44, Monika Hristova wrote:

Hello,

 

We are experiencing regular backpressure (at least once a week). I would like to ask how we can fix it.

 

Currently our configurations are:

vi /usr/lib/flink/conf/flink-conf.yaml

# Settings applied by Cloud Dataproc initialization action

jobmanager.rpc.address: bonusengine-prod-m-0

jobmanager.heap.mb: 4096

jobmanager.rpc.port: 6123

taskmanager.heap.mb: 4096

taskmanager.memory.preallocate: false

taskmanager.numberOfTaskSlots: 8

#taskmanager.network.numberOfBuffers: 21952     # legacy deprecated

taskmanager.network.memory.fraction: 0.9

taskmanager.network.memory.min: 67108864

taskmanager.network.memory.max: 1073741824

taskmanager.memory.segment-size: 65536

parallelism.default: 52

web.port: 8081

web.timeout: 120000

heartbeat.interval: 10000

heartbeat.timeout: 100000

yarn.application-attempts: 10

high-availability: zookeeper

high-availability.zookeeper.quorum: bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181

high-availability.zookeeper.path.root: /flink

#high-availability.zookeeper.storageDir: hdfs:///flink/recovery     # legacy deprecated

high-availability.storageDir: hdfs:///flink/recovery

flink.partition-discovery.interval-millis=60000

fs.hdfs.hadoopconf: /etc/hadoop/conf

state.backend: rocksdb

state.checkpoints.dir: hdfs:///bonusengine/checkpoints/

state.savepoints.dir: hdfs:///bonusengine/savepoints

metrics.reporters: stsd

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter

metrics.reporter.stsd.host: 127.0.0.1

metrics.reporter.stsd.port: 8125

zookeeper.sasl.disable: true

yarn.reallocate-failed: true

yarn.maximum-failed-containers: 32

web.backpressure.refresh-interval: 60000

web.backpressure.num-samples: 100

web.backpressure.delay-between-samples: 50

 

with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs, 60 GB with yarn configuration(see attached file)

 

We have one yarn session started where 8 jobs are run. Three of them are consuming the same source (kafka) which is causing the backpressure, but only one of them experiences backpressure. The state of the job is 20 something MB and the checkpoint is configured as follows:

checkpointing.interval=300000 # makes sure value in  ms of progress happens between checkpoints checkpointing.pause_between_checkpointing=240000 # checkpoints have to complete within value in ms or are discarded checkpointing.timeout=60000 # allows given number of checkpoints to be in progress at the same time checkpointing.max_concurrent_checkpoints=1 # enables/disables externalized checkpoints which are retained after job cancellation checkpointing.externalized_checkpoints.enabled=true

 

as checkpointing pause was increased and timeout was reduced on multiple occasions as the job kept failing unable to recover from backpressure. RocksDB is configured state backend. The problem keeps reproducing even with one minute timeout. Also I would like to point out that the perfect checkpointing for that job would be 2 min.

I would like to ask what might be the problem or at least how to trace it ?

 

Best Regards,

Monika Hristova


JobGraph_1.png (63K) Download Attachment
JobGraph_2.png (76K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

FW: Constant backpressure on flink job

Georgi Stoyanov

Hi guys,

 

Could you help us find why our checkpoint fails so often (we don’t have huge state at all)?

We are experiencing that problem for around 2 months just after upgrading to 1.7.2 and we configured incremental checkpointing. Could this be the reason?

 

Regards,

Georgi Stoyanov

 

From: Monika Hristova <[hidden email]>
Sent: Tuesday, May 7, 2019 3:55 PM
To: Dawid Wysakowicz <[hidden email]>; [hidden email]
Subject: RE: Constant backpressure on flink job

 

Hello Dawid,

 

Thank you for your response.

What I actually noticed is that first checkpoint starts failing with exception “Checkpoint expired before completing”. It keeps failing and then (after an hour or so ) the backpressure occurs. This is my job graph(please see attached files JobGraph_1) with marked in red problematic operators. After this the backpressure occurs and operators marked in red are those with high backpressure (JobGraph_2).

 

Best Regards,

Monika

 

 

From: Dawid Wysakowicz <[hidden email]>
Sent: Thursday, April 25, 2019 11:14 AM
To: Monika Hristova <
[hidden email]>; [hidden email]
Subject: Re: Constant backpressure on flink job

 

Hi Monika,

I would start with identifying the operator that causes backpressure. More information how to monitor backpressure you can find here in the docs[1]. You might also be interested in Seth's (cc'ed) webinar[2], where he also talks how to debug backpressure.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure

[2] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 17:44, Monika Hristova wrote:

Hello,

 

We are experiencing regular backpressure (at least once a week). I would like to ask how we can fix it.

 

Currently our configurations are:

vi /usr/lib/flink/conf/flink-conf.yaml

# Settings applied by Cloud Dataproc initialization action

jobmanager.rpc.address: bonusengine-prod-m-0

jobmanager.heap.mb: 4096

jobmanager.rpc.port: 6123

taskmanager.heap.mb: 4096

taskmanager.memory.preallocate: false

taskmanager.numberOfTaskSlots: 8

#taskmanager.network.numberOfBuffers: 21952     # legacy deprecated

taskmanager.network.memory.fraction: 0.9

taskmanager.network.memory.min: 67108864

taskmanager.network.memory.max: 1073741824

taskmanager.memory.segment-size: 65536

parallelism.default: 52

web.port: 8081

web.timeout: 120000

heartbeat.interval: 10000

heartbeat.timeout: 100000

yarn.application-attempts: 10

high-availability: zookeeper

high-availability.zookeeper.quorum: bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181

high-availability.zookeeper.path.root: /flink

#high-availability.zookeeper.storageDir: hdfs:///flink/recovery     # legacy deprecated

high-availability.storageDir: hdfs:///flink/recovery

flink.partition-discovery.interval-millis=60000

fs.hdfs.hadoopconf: /etc/hadoop/conf

state.backend: rocksdb

state.checkpoints.dir: hdfs:///bonusengine/checkpoints/

state.savepoints.dir: hdfs:///bonusengine/savepoints

metrics.reporters: stsd

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter

metrics.reporter.stsd.host: 127.0.0.1

metrics.reporter.stsd.port: 8125

zookeeper.sasl.disable: true

yarn.reallocate-failed: true

yarn.maximum-failed-containers: 32

web.backpressure.refresh-interval: 60000

web.backpressure.num-samples: 100

web.backpressure.delay-between-samples: 50

 

with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs, 60 GB with yarn configuration(see attached file)

 

We have one yarn session started where 8 jobs are run. Three of them are consuming the same source (kafka) which is causing the backpressure, but only one of them experiences backpressure. The state of the job is 20 something MB and the checkpoint is configured as follows:

checkpointing.interval=300000 # makes sure value in  ms of progress happens between checkpoints checkpointing.pause_between_checkpointing=240000 # checkpoints have to complete within value in ms or are discarded checkpointing.timeout=60000 # allows given number of checkpoints to be in progress at the same time checkpointing.max_concurrent_checkpoints=1 # enables/disables externalized checkpoints which are retained after job cancellation checkpointing.externalized_checkpoints.enabled=true

 

as checkpointing pause was increased and timeout was reduced on multiple occasions as the job kept failing unable to recover from backpressure. RocksDB is configured state backend. The problem keeps reproducing even with one minute timeout. Also I would like to point out that the perfect checkpointing for that job would be 2 min.

I would like to ask what might be the problem or at least how to trace it ?

 

Best Regards,

Monika Hristova


JobGraph_1.png (63K) Download Attachment
JobGraph_2.png (76K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: FW: Constant backpressure on flink job

Dawid Wysakowicz-2

Hi,

Have you checked the logs for exceptions? Could you share the logs with us? Have you tried switching to e.g. FSStateBackend or disabling the incremental checkpoints on 1.7.2 to see in what configuration the problem occurs?

Best,

Dawid

On 20/05/2019 09:09, Georgi Stoyanov wrote:

Hi guys,

 

Could you help us find why our checkpoint fails so often (we don’t have huge state at all)?

We are experiencing that problem for around 2 months just after upgrading to 1.7.2 and we configured incremental checkpointing. Could this be the reason?

 

Regards,

Georgi Stoyanov

 

From: Monika Hristova [hidden email]
Sent: Tuesday, May 7, 2019 3:55 PM
To: Dawid Wysakowicz [hidden email]; [hidden email]
Subject: RE: Constant backpressure on flink job

 

Hello Dawid,

 

Thank you for your response.

What I actually noticed is that first checkpoint starts failing with exception “Checkpoint expired before completing”. It keeps failing and then (after an hour or so ) the backpressure occurs. This is my job graph(please see attached files JobGraph_1) with marked in red problematic operators. After this the backpressure occurs and operators marked in red are those with high backpressure (JobGraph_2).

 

Best Regards,

Monika

 

 

From: Dawid Wysakowicz <[hidden email]>
Sent: Thursday, April 25, 2019 11:14 AM
To: Monika Hristova <
[hidden email]>; [hidden email]
Subject: Re: Constant backpressure on flink job

 

Hi Monika,

I would start with identifying the operator that causes backpressure. More information how to monitor backpressure you can find here in the docs[1]. You might also be interested in Seth's (cc'ed) webinar[2], where he also talks how to debug backpressure.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure

[2] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 17:44, Monika Hristova wrote:

Hello,

 

We are experiencing regular backpressure (at least once a week). I would like to ask how we can fix it.

 

Currently our configurations are:

vi /usr/lib/flink/conf/flink-conf.yaml

# Settings applied by Cloud Dataproc initialization action

jobmanager.rpc.address: bonusengine-prod-m-0

jobmanager.heap.mb: 4096

jobmanager.rpc.port: 6123

taskmanager.heap.mb: 4096

taskmanager.memory.preallocate: false

taskmanager.numberOfTaskSlots: 8

#taskmanager.network.numberOfBuffers: 21952     # legacy deprecated

taskmanager.network.memory.fraction: 0.9

taskmanager.network.memory.min: 67108864

taskmanager.network.memory.max: 1073741824

taskmanager.memory.segment-size: 65536

parallelism.default: 52

web.port: 8081

web.timeout: 120000

heartbeat.interval: 10000

heartbeat.timeout: 100000

yarn.application-attempts: 10

high-availability: zookeeper

high-availability.zookeeper.quorum: bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181

high-availability.zookeeper.path.root: /flink

#high-availability.zookeeper.storageDir: hdfs:///flink/recovery     # legacy deprecated

high-availability.storageDir: hdfs:///flink/recovery

flink.partition-discovery.interval-millis=60000

fs.hdfs.hadoopconf: /etc/hadoop/conf

state.backend: rocksdb

state.checkpoints.dir: hdfs:///bonusengine/checkpoints/

state.savepoints.dir: hdfs:///bonusengine/savepoints

metrics.reporters: stsd

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter

metrics.reporter.stsd.host: 127.0.0.1

metrics.reporter.stsd.port: 8125

zookeeper.sasl.disable: true

yarn.reallocate-failed: true

yarn.maximum-failed-containers: 32

web.backpressure.refresh-interval: 60000

web.backpressure.num-samples: 100

web.backpressure.delay-between-samples: 50

 

with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs, 60 GB with yarn configuration(see attached file)

 

We have one yarn session started where 8 jobs are run. Three of them are consuming the same source (kafka) which is causing the backpressure, but only one of them experiences backpressure. The state of the job is 20 something MB and the checkpoint is configured as follows:

checkpointing.interval=300000 # makes sure value in  ms of progress happens between checkpoints checkpointing.pause_between_checkpointing=240000 # checkpoints have to complete within value in ms or are discarded checkpointing.timeout=60000 # allows given number of checkpoints to be in progress at the same time checkpointing.max_concurrent_checkpoints=1 # enables/disables externalized checkpoints which are retained after job cancellation checkpointing.externalized_checkpoints.enabled=true

 

as checkpointing pause was increased and timeout was reduced on multiple occasions as the job kept failing unable to recover from backpressure. RocksDB is configured state backend. The problem keeps reproducing even with one minute timeout. Also I would like to point out that the perfect checkpointing for that job would be 2 min.

I would like to ask what might be the problem or at least how to trace it ?

 

Best Regards,

Monika Hristova


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: FW: Constant backpressure on flink job

Till Rohrmann
In reply to this post by Georgi Stoyanov
Hi Monika and Georgi,

it is quite hard to debug this problem remotely because one would need access to the logs with DEBUG log level. Additionally, it would be great to have a better understanding of what the individual operators do.

In general, 20 MB of state should be super easy to handle for Flink (independent of incremental and full checkpoints). If the problem only occurred after upgrading to Flink 1.7.2 and enabling incremental checkpoints I would try to disable incremental checkpoints or to enable incremental checkpoints with the older Flink version. Which version did you use before?

Usually, if a checkpoint expired before it can be completed this means that either the checkpoint operations take too long or that the propagation of the checkpoint barriers through the topology is slow. The former can be caused by problems when writing to HDFS for example (retries, overloaded HDFS, slow network connection). The latter can be caused if your operators are slow processing the stream records (e.g. blocking call to external system, too little resources, complex computation for each record).

Cheers,
Till

On Mon, May 20, 2019 at 9:10 AM Georgi Stoyanov <[hidden email]> wrote:

Hi guys,

 

Could you help us find why our checkpoint fails so often (we don’t have huge state at all)?

We are experiencing that problem for around 2 months just after upgrading to 1.7.2 and we configured incremental checkpointing. Could this be the reason?

 

Regards,

Georgi Stoyanov

 

From: Monika Hristova <[hidden email]>
Sent: Tuesday, May 7, 2019 3:55 PM
To: Dawid Wysakowicz <[hidden email]>; [hidden email]
Subject: RE: Constant backpressure on flink job

 

Hello Dawid,

 

Thank you for your response.

What I actually noticed is that first checkpoint starts failing with exception “Checkpoint expired before completing”. It keeps failing and then (after an hour or so ) the backpressure occurs. This is my job graph(please see attached files JobGraph_1) with marked in red problematic operators. After this the backpressure occurs and operators marked in red are those with high backpressure (JobGraph_2).

 

Best Regards,

Monika

 

 

From: Dawid Wysakowicz <[hidden email]>
Sent: Thursday, April 25, 2019 11:14 AM
To: Monika Hristova <
[hidden email]>; [hidden email]
Subject: Re: Constant backpressure on flink job

 

Hi Monika,

I would start with identifying the operator that causes backpressure. More information how to monitor backpressure you can find here in the docs[1]. You might also be interested in Seth's (cc'ed) webinar[2], where he also talks how to debug backpressure.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure

[2] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 17:44, Monika Hristova wrote:

Hello,

 

We are experiencing regular backpressure (at least once a week). I would like to ask how we can fix it.

 

Currently our configurations are:

vi /usr/lib/flink/conf/flink-conf.yaml

# Settings applied by Cloud Dataproc initialization action

jobmanager.rpc.address: bonusengine-prod-m-0

jobmanager.heap.mb: 4096

jobmanager.rpc.port: 6123

taskmanager.heap.mb: 4096

taskmanager.memory.preallocate: false

taskmanager.numberOfTaskSlots: 8

#taskmanager.network.numberOfBuffers: 21952     # legacy deprecated

taskmanager.network.memory.fraction: 0.9

taskmanager.network.memory.min: 67108864

taskmanager.network.memory.max: 1073741824

taskmanager.memory.segment-size: 65536

parallelism.default: 52

web.port: 8081

web.timeout: 120000

heartbeat.interval: 10000

heartbeat.timeout: 100000

yarn.application-attempts: 10

high-availability: zookeeper

high-availability.zookeeper.quorum: bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181

high-availability.zookeeper.path.root: /flink

#high-availability.zookeeper.storageDir: hdfs:///flink/recovery     # legacy deprecated

high-availability.storageDir: hdfs:///flink/recovery

flink.partition-discovery.interval-millis=60000

fs.hdfs.hadoopconf: /etc/hadoop/conf

state.backend: rocksdb

state.checkpoints.dir: hdfs:///bonusengine/checkpoints/

state.savepoints.dir: hdfs:///bonusengine/savepoints

metrics.reporters: stsd

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter

metrics.reporter.stsd.host: 127.0.0.1

metrics.reporter.stsd.port: 8125

zookeeper.sasl.disable: true

yarn.reallocate-failed: true

yarn.maximum-failed-containers: 32

web.backpressure.refresh-interval: 60000

web.backpressure.num-samples: 100

web.backpressure.delay-between-samples: 50

 

with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs, 60 GB with yarn configuration(see attached file)

 

We have one yarn session started where 8 jobs are run. Three of them are consuming the same source (kafka) which is causing the backpressure, but only one of them experiences backpressure. The state of the job is 20 something MB and the checkpoint is configured as follows:

checkpointing.interval=300000 # makes sure value in  ms of progress happens between checkpoints checkpointing.pause_between_checkpointing=240000 # checkpoints have to complete within value in ms or are discarded checkpointing.timeout=60000 # allows given number of checkpoints to be in progress at the same time checkpointing.max_concurrent_checkpoints=1 # enables/disables externalized checkpoints which are retained after job cancellation checkpointing.externalized_checkpoints.enabled=true

 

as checkpointing pause was increased and timeout was reduced on multiple occasions as the job kept failing unable to recover from backpressure. RocksDB is configured state backend. The problem keeps reproducing even with one minute timeout. Also I would like to point out that the perfect checkpointing for that job would be 2 min.

I would like to ask what might be the problem or at least how to trace it ?

 

Best Regards,

Monika Hristova