Hi Aljoscha & Fabian,
I have a stream application that has 2 stream source as below.KeyedStream<String, String> ks1 = ds1.keyBy("*") ; KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v pairs).keyBy(0); ks1.connect(ks2).flatMap(X); //X is a CoFlatMapFunction that inserts and removes elements from ks2 into a key-value state member. Elements from ks1 are matched against that state. the CoFlatMapFunction operator maintains ValueState<Tuple2<Long, Long>>; I tried to use checkpoint every 10 Secs using a FsStateBackend... What I notice is that the checkpoint duration is almost 2 minutes for many cases, while for the other cases it varies from 100 ms to 1.5 minutes frequently. I'm attaching the snapshot of the dashboard for your reference. Is this an issue with flink checkpointing? flink_checkpoint_time.png (69K) Download Attachment |
Hi CVP, I'm not so much familiar with the internals of the checkpointing system, but maybe Stephan (in CC) has an idea what's going on here.2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]>:
|
Hi! Let's try to figure that one out. Can you give us a bit more information? - What source are you using for the slow input? - How large is the state that you are checkpointing? - Can you try to see in the log if actually the state snapshot takes that long, or if it simply takes long for the checkpoint barriers to travel through the stream due to a lot of backpressure? Greetings, Stephan On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi Stefan, Please find my responses below.- What source are you using for the slow input? - How large is the state that you are checkpointing? [CVP] - I have enabled checkpointing on the StreamEnvironment as below. final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints")); streamEnv.enableCheckpointing(10000); In terms of the state stored, the KS1 stream has payload of 100K events/second, while KS2 have about 1 event / 10 minutes... basically the operators perform flatmaps on 8 fields of tuple (all fields are primitives). If you look at the states' sizes in dashboard they are in Kb... - Can you try to see in the log if actually the state snapshot takes that long, or if it simply takes long for the checkpoint barriers to travel through the stream due to a lot of backpressure? [CVP] -There are no back pressure atleast from the sample computation in the flink dashboard. 100K/second is low load for flink's benchmarks. I could not quite get the barriers vs snapshot state. I have attached the Task Manager log (DEBUG) info if that will interest you. I have attached the checkpoints times' as .png from the dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the checkpoints take more than a minute in each case. Before these checkpoints, the KS2 stream did not have any events. As soon as an event(should be in bytes) was generated, the checkpoints went slow and subsequently a minute more for every checkpoint thereafter. This log was collected from the standalone flink cluster with 1 job manager & 2 TMs. 1 TM was running this application with checkpointing (parallelism=1) Please let me know if you need further info., On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]> wrote:
flink_job_Plan.png (42K) Download Attachment Flink-Checkpoint-Times.png (65K) Download Attachment flink-qchavar-taskmanager-1-elxa1h67k32.log (442K) Download Attachment |
Hi, I am also facing this issue, in my case the data is flowing continuously from the Kafka source, when I increase the checkpoint interval to 60000, the data gets written to S3 sink. Is it because some operator is taking more time for processing, like in my case I am using a time window of 1sec. Regards, Vinay Patil On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
In reply to this post by Chakravarthy varaga
Thanks, the logs were very helpful!
TL:DR - The offset committing to ZooKeeper is very slow and prevents proper starting of checkpoints. Here is what is happening in detail: - Between the point when the TaskManager receives the "trigger checkpoint" message and when the point when the KafkaSource actually starts the checkpoint is a long time (many seconds) - for one of the Kafka Inputs (the other is fine). - The only way this delayed can be introduced is if another checkpoint related operation (such as trigger() or notifyComplete() ) is still in progress when the checkpoint is started. Flink does not perform concurrent checkpoint operations on a single operator, to ease the concurrency model for users. - The operation that is still in progress must be the committing of the offsets (to ZooKeeper or Kafka). That also explains why this only happens once one side receives the first record. Before that, there is nothing to commit. What Flink should fix: - The KafkaConsumer should run the commit operations asynchronously, to not block the "notifyCheckpointComplete()" method. What you can fix: - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works well, the other does not. Do they go against different sets of brokers, or different ZooKeepers? Is the metadata for one input bad? - In the next Flink version, you may opt-out of committing offsets to Kafka/ZooKeeper all together. It is not important for Flink's checkpoints anyways. Greetings, Stephan On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <[hidden email]> wrote:
|
In reply to this post by Vinay Patil
@vinay - Is it in your case large state that causes slower checkpoints?
On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]> wrote:
|
I am not sure about that, I will run the pipeline on cluster and share the details Since window is a stateful operator , it will store only the key part in the state backend and not the value , right ? Regards, Vinay Patil On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
@vinay - Window operators store everything in the state backend. On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <[hidden email]> wrote:
|
In reply to this post by Stephan Ewen
Hi Stefan, Thanks a million for your detailed explanation. I appreciate it. BTW., The kafka connector version that I use is as suggested in the flink connectors page. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>1.1.1</version> </dependency> Do you see any issues with versions? e) All of my operators Is my understanding right? 3) Is there a way in Flink to checkpoint only d) as stated above 4) Can you apply checkpointing to only streams and certain operators (say I wish to store aggregated values part of the transformation) Best Regards CVP On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <[hidden email]> wrote:
|
In reply to this post by Stephan Ewen
Hi Stephan, Ok, I think that may be taking lot of time, so when you say everything that it stores does it mean that all the input to the window is stored in state backend. For Ex: for my apply function, the input is is Iterable<DTO>, the DTO can contain multiple elements, and the DTO contains roughly 50 fields So do you mean that the complete DTO will be stored in the state backend ? If yes then its probably better to use RocksDB as state backend. Also I am using AWS Client Side Encryption for writing encrypted data to S3, so may be that is also taking some time. What do you think ? Regards, Vinay Patil On Tue, Sep 27, 2016 at 3:51 AM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
In reply to this post by Chakravarthy varaga
@CVP Flink stores in checkpoints in your case only the Kafka offsets (few bytes) and the custom state (e). Here is an illustration of the checkpoint and what is stored (from the Flink docs). I am quite puzzled why the offset committing problem occurs only for one input, and not for the other. I am preparing a fix for 1.2, possibly going into 1.1.3 as well. Could you try out a snapshot version to see if that fixes your problem? Greetings, Stephan On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <[hidden email]> wrote:
|
In reply to this post by Vinay Patil
@vinay - Flink needs to store all pending windows in the checkpoint, i.e., windows that have elements but have not yet fires/purged. I guess client side encryption can add to the delay. If you use RocksDB asynchronous snapshots (1.1.x) then this delay should be hidden. Greetings, Stephan On Tue, Sep 27, 2016 at 5:20 PM, vinay patil <[hidden email]> wrote:
|
Thanks Stephan for your inputs We are getting the checkpointing issue for other projects as well in which the window and encryption stuff is not there (using Flink 1.1.1). As you suggested, I will try using RocksDB and run the pipeline on EMR to provide more details. Regards, Vinay Patil On Tue, Sep 27, 2016 at 1:43 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
In reply to this post by Stephan Ewen
Hi Stephan, That should be great. Let me know once the fix is done and the snapshot version to use, I'll check and revert then. Can you also share the JIRA that tracks the issue? On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi Stephan, Is the Async kafka offset commit released in 1.3.1?On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <[hidden email]> wrote:
|
Hi, Helping out here: this is the PR for async Kafka offset committing - https://github.com/apache/flink/pull/2574. It has already been merged into the master and release-1.1 branches, so you can try out the changes now if you’d like. The change should also be included in the 1.1.3 release, which the Flink community is discussing to release soon. Will definitely be helpful if you can provide feedback afterwards! Best Regards, Gordon On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga ([hidden email]) wrote:
|
Hi Gordon, Do I need to clone and build release-1.1 branch to test this?On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
The plan to release 1.1.3 is asap ;-) Waiting for last backported patched to get in, then release testing and release. If you want to test it today, you would need to manually build the release-1.1 branch. Best, Stephan On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <[hidden email]> wrote:
|
Thanks for your prompt response Stephan. I'd wait for Flink 1.1.3 !!! On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |