Checkpoints very slow with high backpressure

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoints very slow with high backpressure

Yassine MARZOUGUI
Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.



In the job manager logs I keep getting warnings : 

2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Fwd: Checkpoints very slow with high backpressure

Yassine MARZOUGUI
---------- Forwarded message ----------
From: "Yassine MARZOUGUI" <[hidden email]>
Date: Apr 23, 2017 20:53
Subject: Checkpoints very slow with high backpressure
To: <[hidden email]>
Cc:

Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.



In the job manager logs I keep getting warnings : 

2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Yassine MARZOUGUI
In reply to this post by Yassine MARZOUGUI
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb.

On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.



In the job manager logs I keep getting warnings : 

2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Rune Skou Larsen

Sorry I cant help you, but we're also experiencing slow checkpointing, when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail -  checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing with backpressure becomes faster?

- Rune

---

Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb.
On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi all,
I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.
I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.
In the job manager logs I keep getting warnings : 
2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?
Thank you.
Best,
Yassine
--

Venlig hilsen/Best regards Rune Skou Larsen

goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone +45 3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Ufuk Celebi
@Yessine: no, there is no way to disable the back pressure mechanism. Do you have more details about the two last operators? What do you mean with the process function is slow on purpose?

@Rune: with 1.3 Flink will configure the internal buffers in a way that not too much data is buffered in the internal buffers (https://issues.apache.org/jira/browse/FLINK-4545). You could try the current master and check whether it improves the checkpointing behaviour under back pressure. Out of curiosity, are you using the async I/O API for the communication with the external REST service (https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html)?

– Ufuk


On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <[hidden email]> wrote:

Sorry I cant help you, but we're also experiencing slow checkpointing, when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail -  checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing with backpressure becomes faster?

- Rune

---

Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb.
On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi all,
I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.
I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.
In the job manager logs I keep getting warnings : 
2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?
Thank you.
Best,
Yassine
--

Venlig hilsen/Best regards Rune Skou Larsen

goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone <a href="tel:+45%2031%2060%2024%2097" value="+4531602497" target="_blank">+45 3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen


Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Yassine MARZOUGUI
Hi Ufuk,

The ProcessFunction receives elements and buffers them into a MapState, and periodically (for example every x seconds) register processing time timers (according to some rules which it gets from a connected rule stream). When a timer fires, I pop next element from state, request an external server, and collect the response.
The requests to the external server should happen periodically and not continuousely, that's why I control them using timers, and buffer elements in the RocksdbState.

2017-04-24 13:48 GMT+02:00 Ufuk Celebi <[hidden email]>:
@Yessine: no, there is no way to disable the back pressure mechanism. Do you have more details about the two last operators? What do you mean with the process function is slow on purpose?

@Rune: with 1.3 Flink will configure the internal buffers in a way that not too much data is buffered in the internal buffers (https://issues.apache.org/jira/browse/FLINK-4545). You could try the current master and check whether it improves the checkpointing behaviour under back pressure. Out of curiosity, are you using the async I/O API for the communication with the external REST service (https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html)?

– Ufuk


On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <[hidden email]> wrote:

Sorry I cant help you, but we're also experiencing slow checkpointing, when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail -  checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing with backpressure becomes faster?

- Rune

---

Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb.
On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi all,
I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.
I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.
In the job manager logs I keep getting warnings : 
2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?
Thank you.
Best,
Yassine
--

Venlig hilsen/Best regards Rune Skou Larsen

goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone <a href="tel:+45%2031%2060%2024%2097" value="+4531602497" target="_blank">+45 3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen



Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

rhashmi
This post was updated on .
So what is the resolution? i have a use case where flink consuming messages from kafka. Flink went down about a day ago, so now flink has to process 24 hour worth of events. But i hit backpressure, as of right now checkpoint are timing out. Is there any recommendation how to handle this situation?

Seems like trigger are also not firing so no update being made to down line database.

is there recommended approach to handle backpressure?

Version Flink 1.2.

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

SHI Xiaogang
Hi rhashmi

We are also experiencing slow checkpoints when there exist back pressure. It seems there is no good method to handle back pressure now. 

We work around it by setting a larger number of checkpoint timeout. The default value is 10min. But checkpoints usually take more time to complete when there exists back pressure.  You can set it via `CheckpointConfig#setCheckpointTimeout()`.

Regards,
Xiaogang



2017-06-01 5:36 GMT+08:00 rhashmi <[hidden email]>:
So what is the resolution? flink consuming messages from kafka. Flink went
down about a day ago, so now flink has to process 24 hour worth of events.
But i hit backpressure, as of right now checkpoint are timing out. Is there
any recommendation how to handle this situation?

Seems like trigger are also not firing so no update being made to down line
database.

is there recommended approach to handle backpressure?

Version Flink 1.2.






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13411.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Chen Qin
What is root cause of back pressure? 
The reason why I ask is we investigated and applied metrics to measure time to process event and ends up finding bottle neck at frequent managed state updates. Our approach was keeping mem cache and periodical updates states before checkpointing cycle kick in.

This thread might somehow related.

Chen

On Wed, May 31, 2017 at 7:19 PM, SHI Xiaogang <[hidden email]> wrote:
Hi rhashmi

We are also experiencing slow checkpoints when there exist back pressure. It seems there is no good method to handle back pressure now. 

We work around it by setting a larger number of checkpoint timeout. The default value is 10min. But checkpoints usually take more time to complete when there exists back pressure.  You can set it via `CheckpointConfig#setCheckpointTimeout()`.

Regards,
Xiaogang



2017-06-01 5:36 GMT+08:00 rhashmi <[hidden email]>:
So what is the resolution? flink consuming messages from kafka. Flink went
down about a day ago, so now flink has to process 24 hour worth of events.
But i hit backpressure, as of right now checkpoint are timing out. Is there
any recommendation how to handle this situation?

Seems like trigger are also not firing so no update being made to down line
database.

is there recommended approach to handle backpressure?

Version Flink 1.2.






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13411.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

rhashmi
I tried to extend timeout to 1 hour but no luck. it is still timing out & no exception in log file So i am guessing something stuck, will dig down further.

Here is configuration detail.
 
Standalone cluster & checkpoint store in S3.

i just have 217680 messages in 24 partitions.

Anyidea?
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

rhashmi
Enable info log. it seems it stuck


==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:45:18,229 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1496321118221

==> /mnt/ephemeral/logs/flink-flink-taskmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:45:18,237 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@79e68dd3 for Async calls on Source: Custom Source (2/12)
2017-06-01 12:45:18,237 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@78da1e82 for Async calls on Source: Custom Source (5/12)
2017-06-01 12:45:18,238 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@68bff79e for Async calls on Source: Custom Source (8/12)
2017-06-01 12:45:18,238 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@600bdc29 for Async calls on Source: Custom Source (11/12)
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:24,854 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2017-06-01 12:45:24,860 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,860 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,860 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,862 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,896 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,896 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,902 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:24,905 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:24,909 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,909 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/
2017-06-01 12:45:24,911 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,911 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,911 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,915 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:24,916 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:24,923 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181
2017-06-01 12:45:24,925 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/
2017-06-01 12:45:24,925 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,925 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,925 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:25,187 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (11/12)
2017-06-01 12:45:25,188 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (2/12)
2017-06-01 12:45:25,196 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (5/12)
2017-06-01 12:45:25,197 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:25,203 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (8/12)
2017-06-01 12:45:25,227 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:25,257 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:25,277 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::

==> /mnt/ephemeral/logs/flink-flink-client-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:45:45,350 WARN  org.apache.flink.runtime.client.JobSubmissionClientActor      - Discard message LeaderSessionMessage(null,ConnectionTimeout) because the expected leader session ID 2d2a8eac-b837-4605-93cc-81720247f247 did not equal the received leader session ID null.

==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:55:18,229 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 1 expired before completing.
2017-06-01 12:55:18,233 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1496321718230

==> /mnt/ephemeral/logs/flink-flink-taskmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:55:18,235 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@44074ae6 for Async calls on Source: Custom Source (2/12)
2017-06-01 12:55:18,235 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@463dc5a1 for Async calls on Source: Custom Source (5/12)
2017-06-01 12:55:18,236 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@7871a1bb for Async calls on Source: Custom Source (8/12)
2017-06-01 12:55:18,237 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@57df8c1d for Async calls on Source: Custom Source (11/12)

==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:58:30,764 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 1 from c601dd04affa7da13a226daa222062e7 of job 303656ace348131ed7a38bb02b4fe374.
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

rhashmi
Nvm i found it. Backpressure caused by aws RDS instance of mysql.
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Edward
I read through this thread and didn't see any resolution to the slow
checkpoint issue (just that someone resolved their backpressure issue).

We are experiencing the same problem:
- When there is no backpressure, checkpoints take less than 100ms
- When there is high backpressure, checkpoints take anywhere from 5 minutes
to 25 minutes.

This is preventing us from using the checkpointing feature at all, since
periodic backpressure is unavoidable.

We are experiencing this when running on Flink 1.4.0.
We are retaining only a single checkpoint, and the size of retained
checkpoint is less than 250KB, so there's not a lot of state.
   state.backend: jobmanager
   state.backend.async: true
   state.backend.fs.checkpointdir: hdfs://checkpoints
   state.checkpoints.num-retained: 1
   max concurrent checkpoints: 1
   checkpointing mode: AT_LEAST_ONCE

One other data point: if I rewrite the job to allow chaining all steps (i.e.
same parallelism on all steps, so they fit in 1 task slot), the checkpoints
are still slow under backpressure, but are an order of magnitude faster --
they take about 60 seconds rather than 15 minutes.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Piotr Nowojski
Hi,

If I’m not mistaken this is a known issue, that we were working to resolve for Flink 1.5 release. The problem is that with back pressure, data are being buffered between nodes and on checkpoint, all of those data must be processed before checkpoint can be completed. This is especially problematic if processing a single record takes/can take significant amount of time. 

With Flink 1.5 we introduced mechanism to better control the amount of buffered data and it should address this issue (Flink 1.5 should be released within couple of weeks).

In the mean time, you could try out Flink 1.5 release candidate that has been just published or you could try to reduce the number of configured network buffers, however keep in mind that at some point this can decrease your maximal throughput:


On the other hand, why does it prevents you from using a checkpointing at all? 

Piotr Nowojski 

On 5 Apr 2018, at 06:10, Edward <[hidden email]> wrote:

I read through this thread and didn't see any resolution to the slow
checkpoint issue (just that someone resolved their backpressure issue).

We are experiencing the same problem:
- When there is no backpressure, checkpoints take less than 100ms
- When there is high backpressure, checkpoints take anywhere from 5 minutes
to 25 minutes.

This is preventing us from using the checkpointing feature at all, since
periodic backpressure is unavoidable.

We are experiencing this when running on Flink 1.4.0.
We are retaining only a single checkpoint, and the size of retained
checkpoint is less than 250KB, so there's not a lot of state.
  state.backend: jobmanager
  state.backend.async: true
  state.backend.fs.checkpointdir: <a href="hdfs://checkpoints" class="">hdfs://checkpoints
  state.checkpoints.num-retained: 1
  max concurrent checkpoints: 1
  checkpointing mode: AT_LEAST_ONCE

One other data point: if I rewrite the job to allow chaining all steps (i.e.
same parallelism on all steps, so they fit in 1 task slot), the checkpoints
are still slow under backpressure, but are an order of magnitude faster --
they take about 60 seconds rather than 15 minutes.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Edward
Thanks for the update Piotr.

The reason it prevents us from using checkpoints is this:
We are relying on the checkpoints to trigger commit of Kafka offsets for our
source (kafka consumers).
When there is no backpressure this works fine. When there is backpressure,
checkpoints fail because they take too long, and our Kafka offsets are never
committed to Kafka brokers (as we just learned the hard way).

Normally there is no backpressure in our jobs, but when there is some
outage, then the jobs do experience
backpressure when catching up. And when you're already trying to recover
from an incident, that is not the ideal time for kafka offsets commits to
stop working.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Piotr Nowojski
Thanks for the explanation.

I hope that either 1.5 will solve your issue (please let us know if it doesn’t!) or if you can’t wait, that decreasing memory buffers can mitigate the problem.

Piotrek

> On 5 Apr 2018, at 08:13, Edward <[hidden email]> wrote:
>
> Thanks for the update Piotr.
>
> The reason it prevents us from using checkpoints is this:
> We are relying on the checkpoints to trigger commit of Kafka offsets for our
> source (kafka consumers).
> When there is no backpressure this works fine. When there is backpressure,
> checkpoints fail because they take too long, and our Kafka offsets are never
> committed to Kafka brokers (as we just learned the hard way).
>
> Normally there is no backpressure in our jobs, but when there is some
> outage, then the jobs do experience
> backpressure when catching up. And when you're already trying to recover
> from an incident, that is not the ideal time for kafka offsets commits to
> stop working.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

回复:Checkpoints very slow with high backpressure

Zhijiang(wangzhijiang999)
       Backpressure is indeed delayed the checkpoints because of gradually accumulated inflighting network buffers before barrier alignment.
       As Piotr explained, 1.5 can improve to some extent. 
       After 1.5 we plan to further speed the checkpoint by controlling the channel reader to improve barrier alignment, that has already been verified to decrease the alignment time greatly for backpressure scenarios.

        zhijiang

------------------------------------------------------------------
发件人:Piotr Nowojski <[hidden email]>
发送时间:2018年4月6日(星期五) 00:06
收件人:Edward <[hidden email]>
抄 送:user <[hidden email]>
主 题:Re: Checkpoints very slow with high backpressure

Thanks for the explanation.

I hope that either 1.5 will solve your issue (please let us know if it doesn’t!) or if you can’t wait, that decreasing memory buffers can mitigate the problem.

Piotrek

> On 5 Apr 2018, at 08:13, Edward <[hidden email]> wrote:

> Thanks for the update Piotr.

> The reason it prevents us from using checkpoints is this:
> We are relying on the checkpoints to trigger commit of Kafka offsets for our
> source (kafka consumers).
> When there is no backpressure this works fine. When there is backpressure,
> checkpoints fail because they take too long, and our Kafka offsets are never
> committed to Kafka brokers (as we just learned the hard way).

> Normally there is no backpressure in our jobs, but when there is some
> outage, then the jobs do experience 
> backpressure when catching up. And when you're already trying to recover
> from an incident, that is not the ideal time for kafka offsets commits to
> stop working.




> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/