Incremental checkpointing performance

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Incremental checkpointing performance

Miyuru Dayarathna
Hi,

We did a performance test of Flink's incremental checkpointing to measure the average time it takes to create a checkpoint and the average checkpoint file size. We did this test on a single computer in order to avoid the latencies introduced by network communication. The computer had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD, 101GB free SSD space. The computer was running on Ubuntu 16.04 LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The test was run for 40 minutes.

The Flink application we used is as follows,
//---------------------------------------------------------------------------------------------------------
public class LengthWindowIncrementalCheckpointing {
    private static DataStream<Tuple4<String, Float, Integer, String>> inputStream = null;
    private static final int PARALLELISM = 1;
    private static final int timeoutMillis = 10;
    private static final int WINDOWLENGTH = 10000;
    private static final int SLIDELENGTH = 1;
    private static Logger logger = LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);
        try {
            env.setStateBackend(new RocksDBStateBackend(
                    new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
                    true));
        } catch (IOException e) {
            e.printStackTrace();
        }

        env.setBufferTimeout(timeoutMillis);
        inputStream = env.addSource(new MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");

        DataStream<Tuple4<String, Float, Integer, String>> incrementStream2 =
                inputStream.filter(new FilterFunction<Tuple4<String, Float, Integer, String>>() {
                    @Override
                    public boolean filter(Tuple4<String, Float, Integer, String> tuple) throws Exception {
                        if (tuple.f1 > 10) {
                            return true;
                        }
                        return false;
                    }
                }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
        incrementStream2.writeUsingOutputFormat(new DiscardingOutputFormat<Tuple4<String, Float, Integer,
                String>>());

        try {
            env.execute("Flink application.");
        } catch (Exception e) {
            logger.error("Error in starting the Flink stream application: " + e.getMessage(), e);
        }
    }
}

//---------------------------------------------------------------------------------------------------------

I have attached two charts (Average_latencies.jpg and Average_state_sizes.jpg) with the results and another image with the Flink dashboard (Flink-Dashboard.png). The average state size chart indicates that the size of an incremental checkpoint is smaller than a full (i.e., complete) checkpoint. This is the expected behavior from any incremental checkpointing system since the incremental checkpoint just stores the delta change. However, the average latency chart indicates that the average latency for taking an incremental checkpoint from Flink is larger than taking a complete (i.e., Full) checkpoint. Is this the expected behavior? I have highlighted the two fields in the "Flink-Dashboard.png" which we used as the fields for the average latency and the average state size. Note that to convert the incremental checkpointing application to full checkpointing application we just commented out the following lines in the above code.

        try {
            env.setStateBackend(new RocksDBStateBackend(
                    new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
                    true));
        } catch (IOException e) {
            e.printStackTrace();
        }

Thanks,
Miyuru

Average_latencies.jpg (46K) Download Attachment
Average_state_sizes.jpg (43K) Download Attachment
Flink-Dashboard.png (158K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Incremental checkpointing performance

Nico Kruber
Hi Miyuru,
Indeed, the behaviour you observed sounds strange and kind of go against
the results Stefan presented in [1]. To see what is going on, can you
also share your changes to Flink's configuration, i.e. flink-conf.yaml?

Let's first make sure you're really comparing RocksDBStateBackend with
vs without incremental checkpoints:
- if you remove this from the code:
    env.setStateBackend(new RocksDBStateBackend(
           new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
           true));
then you will end up with the state backend configured via the
"state.backend" property. Was this set to "rocksdb"? Alternatively, you
can set the second parameter to the RocksDBStateBackend constructor to
false to get the right back-end.

You can also verify the values you see from the web interface by looking
into the logs (at INFO level). There, you should see reports like this:
"Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
in thread ... took ... ms."

Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.


Let's get started with this and see whether there is anything unusual.


Regards,
Nico


[1]
https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/

On 19/03/18 05:25, Miyuru Dayarathna wrote:

> Hi,
>
> We did a performance test of Flink's incremental checkpointing to
> measure the average time it takes to create a checkpoint and the average
> checkpoint file size. We did this test on a single computer in order to
> avoid the latencies introduced by network communication. The computer
> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
> 101GB free SSD space. The computer was running on Ubuntu 16.04
> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
> test was run for 40 minutes.
>
> The Flink application we used is as follows,
> //---------------------------------------------------------------------------------------------------------
> public class LengthWindowIncrementalCheckpointing {
>     private static DataStream<Tuple4<String, Float, Integer, String>>
> inputStream = null;
>     private static final int PARALLELISM = 1;
>     private static final int timeoutMillis = 10;
>     private static final int WINDOWLENGTH = 10000;
>     private static final int SLIDELENGTH = 1;
>     private static Logger logger =
> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
>
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         // start a checkpoint every 1000 ms
>         env.enableCheckpointing(1000);
>         try {
>             env.setStateBackend(new RocksDBStateBackend(
>                     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>                     true));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>
>         env.setBufferTimeout(timeoutMillis);
>         inputStream = env.addSource(new
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
>
>         DataStream<Tuple4<String, Float, Integer, String>>
> incrementStream2 =
>                 inputStream.filter(new FilterFunction<Tuple4<String,
> Float, Integer, String>>() {
>                     @Override
>                     public boolean filter(Tuple4<String, Float, Integer,
> String> tuple) throws Exception {
>                         if (tuple.f1 > 10) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>         incrementStream2.writeUsingOutputFormat(new
> DiscardingOutputFormat<Tuple4<String, Float, Integer,
>                 String>>());
>
>         try {
>             env.execute("Flink application.");
>         } catch (Exception e) {
>             logger.error("Error in starting the Flink stream
> application: " + e.getMessage(), e);
>         }
>     }
> }
>
> //---------------------------------------------------------------------------------------------------------
>
> I have attached two charts (Average_latencies.jpg and
> Average_state_sizes.jpg) with the results and another image with the
> Flink dashboard (Flink-Dashboard.png). The average state size chart
> indicates that the size of an incremental checkpoint is smaller than a
> full (i.e., complete) checkpoint. This is the expected behavior from any
> incremental checkpointing system since the incremental checkpoint just
> stores the delta change. However, the average latency chart indicates
> that the average latency for taking an incremental checkpoint from Flink
> is larger than taking a complete (i.e., Full) checkpoint. Is this the
> expected behavior? I have highlighted the two fields in the
> "Flink-Dashboard.png" which we used as the fields for the average
> latency and the average state size. Note that to convert the incremental
> checkpointing application to full checkpointing application we just
> commented out the following lines in the above code.
>
>         try {
>             env.setStateBackend(new RocksDBStateBackend(
>                     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>                     true));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>
> Thanks,
> Miyuru


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Incremental checkpointing performance

Miyuru Dayarathna
Hi Nico,

Thanks for the detailed explanation. The only change I have made in my flink-conf.yaml file is the following.

state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb

The default "state.backend" value is set to filesystem. Removing the env.setStateBackend() method code or changing the "state.backend" property to rocksdb does not change the state backend to RocksDB. I got this verified by looking at the Flink log files. I have mentioned a sample of the log file for your reference.

-------------------------------------------------------
carbon-5th:38631/user/taskmanager) as 1ac63dfb481eab3d3165a965084115f3. Current number of registered hosts is 1. Current number of alive task slots is 1.
2018-03-19 23:10:11,606 INFO  org.apache.flink.runtime.client.JobClient                     - Checking and uploading JAR files
2018-03-19 23:10:11,618 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
2018-03-19 23:10:11,623 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=10000) for 7c19a14f4e75149ffaa064fac7e2bf29.
2018-03-19 23:10:11,636 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-03-19 23:10:11,648 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job Flink application. (7c19a14f4e75149ffaa064fac7e2bf29).
2018-03-19 23:10:11,648 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Successfully ran initialization on master in 0 ms.
2018-03-19 23:10:11,664 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using application-defined state backend for checkpoint/savepoint metadata: RocksDB State Backend {isInitialized=false, configuredDbBasePaths=null, initializedDbBasePaths=null, checkpointStreamBackend=File State Backend @ file:/home/ubuntu/tmp-flink-rocksdb}.
2018-03-19 23:10:11,685 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
2018-03-19 23:10:11,685 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink application. (7c19a14f4e75149ffaa064fac7e2bf29) switched from state CREATED to RUNNING.
2018-03-19 23:10:11,692 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from CREATED to SCHEDULED.
2018-03-19 23:10:11,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75}, CountTrigger(1), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (796fcd9c38c87b6efb6f512e78e626e9) switched from CREATED to SCHEDULED.
2018-03-19 23:10:11,706 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from SCHEDULED to DEPLOYING.
2018-03-19 23:10:11,707 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: inputStream -> Filter (1/1) (attempt #0) to computer1
2018-03-19 23:10:11,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75}, CountTrigger(1), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (796fcd9c38c87b6efb6f512e78e626e9) switched from SCHEDULED to DEPLOYING.
2018-03-19 23:10:11,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75}, CountTrigger(1), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (attempt #0) to computer1
2018-03-19 23:10:12,004 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from DEPLOYING to RUNNING.
2018-03-19 23:10:12,011 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75}, CountTrigger(1), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (796fcd9c38c87b6efb6f512e78e626e9) switched from DEPLOYING to RUNNING.
2018-03-19 23:10:12,695 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1521481212687
2018-03-19 23:10:12,844 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (244193 bytes in 155 ms).
2018-03-19 23:10:13,687 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1521481213687
2018-03-19 23:10:13,744 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (257342 bytes in 46 ms).
2018-03-19 23:10:14,687 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1521481214687
2018-03-19 23:10:14,786 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 (271359 bytes in 98 ms).
2018-03-19 23:10:15,688 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1521481215687
2018-03-19 23:10:15,780 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 (285375 bytes in 91 ms).
2018-03-19 23:10:16,687 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 5 @ 1521481216687
2018-03-19 23:10:16,764 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 5 (299392 bytes in 76 ms).
----------------------------------------------------------------------------------
I did not get "Asynchronous RocksDB snapshot ..." kind of message in the logs. Even if I have changed the state backend properties in the flink-conf.yaml file the log message remained the same. I think there is some issue with detecting the correct state back end.

Regarding the following sentence,
------------------------------------------------------------------------------------
Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.

------------------------------------------------------------------------------------
I have used the average checkpoint size in the charts which was obtained through the Flink dashboard. I hope the values in the Flink dashboard shows the holistic accurate view of the checkpoint sizes. If not, could you please explain how to measure the size of an incremental checkpoint in Flink?


Thanks,
Miyuru

On Monday, 19 March 2018, 19:46:36 GMT+5:30, Nico Kruber <[hidden email]> wrote:


Hi Miyuru,
Indeed, the behaviour you observed sounds strange and kind of go against
the results Stefan presented in [1]. To see what is going on, can you
also share your changes to Flink's configuration, i.e. flink-conf.yaml?

Let's first make sure you're really comparing RocksDBStateBackend with
vs without incremental checkpoints:
- if you remove this from the code:
    env.setStateBackend(new RocksDBStateBackend(
          new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
          true));
then you will end up with the state backend configured via the
"state.backend" property. Was this set to "rocksdb"? Alternatively, you
can set the second parameter to the RocksDBStateBackend constructor to
false to get the right back-end.

You can also verify the values you see from the web interface by looking
into the logs (at INFO level). There, you should see reports like this:
"Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
in thread ... took ... ms."

Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.


Let's get started with this and see whether there is anything unusual.


Regards,
Nico


[1]
https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/

On 19/03/18 05:25, Miyuru Dayarathna wrote:

> Hi,
>
> We did a performance test of Flink's incremental checkpointing to
> measure the average time it takes to create a checkpoint and the average
> checkpoint file size. We did this test on a single computer in order to
> avoid the latencies introduced by network communication. The computer
> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
> 101GB free SSD space. The computer was running on Ubuntu 16.04
> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
> test was run for 40 minutes.
>
> The Flink application we used is as follows,
> //---------------------------------------------------------------------------------------------------------
> public class LengthWindowIncrementalCheckpointing {
>     private static DataStream<Tuple4<String, Float, Integer, String>>
> inputStream = null;
>     private static final int PARALLELISM = 1;
>     private static final int timeoutMillis = 10;
>     private static final int WINDOWLENGTH = 10000;
>     private static final int SLIDELENGTH = 1;
>     private static Logger logger =
> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
>
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         // start a checkpoint every 1000 ms
>         env.enableCheckpointing(1000);
>         try {
>             env.setStateBackend(new RocksDBStateBackend(
>                     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>                     true));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>
>         env.setBufferTimeout(timeoutMillis);
>         inputStream = env.addSource(new
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
>
>         DataStream<Tuple4<String, Float, Integer, String>>
> incrementStream2 =
>                 inputStream.filter(new FilterFunction<Tuple4<String,
> Float, Integer, String>>() {
>                     @Override
>                     public boolean filter(Tuple4<String, Float, Integer,
> String> tuple) throws Exception {
>                         if (tuple.f1 > 10) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>         incrementStream2.writeUsingOutputFormat(new
> DiscardingOutputFormat<Tuple4<String, Float, Integer,
>                 String>>());
>
>         try {
>             env.execute("Flink application.");
>         } catch (Exception e) {
>             logger.error("Error in starting the Flink stream
> application: " + e.getMessage(), e);
>         }
>     }
> }
>
> //---------------------------------------------------------------------------------------------------------
>
> I have attached two charts (Average_latencies.jpg and
> Average_state_sizes.jpg) with the results and another image with the
> Flink dashboard (Flink-Dashboard.png). The average state size chart
> indicates that the size of an incremental checkpoint is smaller than a
> full (i.e., complete) checkpoint. This is the expected behavior from any
> incremental checkpointing system since the incremental checkpoint just
> stores the delta change. However, the average latency chart indicates
> that the average latency for taking an incremental checkpoint from Flink
> is larger than taking a complete (i.e., Full) checkpoint. Is this the
> expected behavior? I have highlighted the two fields in the
> "Flink-Dashboard.png" which we used as the fields for the average
> latency and the average state size. Note that to convert the incremental
> checkpointing application to full checkpointing application we just
> commented out the following lines in the above code.
>
>         try {
>             env.setStateBackend(new RocksDBStateBackend(
>                     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>                     true));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>
> Thanks,
> Miyuru
Reply | Threaded
Open this post in threaded view
|

Re: Incremental checkpointing performance

Miyuru Dayarathna
Hi,

Since we could not observe log messages such as "Asynchronous RocksDB snapshot" in the Flink's log files, we ran the application with Flink 1.3.3 as well. But it also did not print the log message. Hence I am wondering whether we ran Flink's incremental checkpointing in the correct manner. I have attached the complete application with this email. Could you please run this in your setup and let me know whether you get the incremental checkpoint related logs printed in your Flink setup?

Thanks,
Miyuru




On Monday, 19 March 2018, 23:33:37 GMT+5:30, Miyuru Dayarathna <[hidden email]> wrote:


Hi Nico,

Thanks for the detailed explanation. The only change I have made in my flink-conf.yaml file is the following.

state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb

The default "state.backend" value is set to filesystem. Removing the env.setStateBackend() method code or changing the "state.backend" property to rocksdb does not change the state backend to RocksDB. I got this verified by looking at the Flink log files. I have mentioned a sample of the log file for your reference.

-------------------------------------------------------
carbon-5th:38631/user/taskmanager) as 1ac63dfb481eab3d3165a965084115f3. Current number of registered hosts is 1. Current number of alive task slots is 1.
2018-03-19 23:10:11,606 INFO  org.apache.flink.runtime.client.JobClient                     - Checking and uploading JAR files
2018-03-19 23:10:11,618 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
2018-03-19 23:10:11,623 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=10000) for 7c19a14f4e75149ffaa064fac7e2bf29.
2018-03-19 23:10:11,636 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-03-19 23:10:11,648 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job Flink application. (7c19a14f4e75149ffaa064fac7e2bf29).
2018-03-19 23:10:11,648 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Successfully ran initialization on master in 0 ms.
2018-03-19 23:10:11,664 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using application-defined state backend for checkpoint/savepoint metadata: RocksDB State Backend {isInitialized=false, configuredDbBasePaths=null, initializedDbBasePaths=null, checkpointStreamBackend=File State Backend @ file:/home/ubuntu/tmp-flink-rocksdb}.
2018-03-19 23:10:11,685 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
2018-03-19 23:10:11,685 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink application. (7c19a14f4e75149ffaa064fac7e2bf29) switched from state CREATED to RUNNING.
2018-03-19 23:10:11,692 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from CREATED to SCHEDULED.
2018-03-19 23:10:11,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75}, CountTrigger(1), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (796fcd9c38c87b6efb6f512e78e626e9) switched from CREATED to SCHEDULED.
2018-03-19 23:10:11,706 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from SCHEDULED to DEPLOYING.
2018-03-19 23:10:11,707 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: inputStream -> Filter (1/1) (attempt #0) to computer1
2018-03-19 23:10:11,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75}, CountTrigger(1), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (796fcd9c38c87b6efb6f512e78e626e9) switched from SCHEDULED to DEPLOYING.
2018-03-19 23:10:11,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75}, CountTrigger(1), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (attempt #0) to computer1
2018-03-19 23:10:12,004 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from DEPLOYING to RUNNING.
2018-03-19 23:10:12,011 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75}, CountTrigger(1), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (796fcd9c38c87b6efb6f512e78e626e9) switched from DEPLOYING to RUNNING.
2018-03-19 23:10:12,695 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1521481212687
2018-03-19 23:10:12,844 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (244193 bytes in 155 ms).
2018-03-19 23:10:13,687 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1521481213687
2018-03-19 23:10:13,744 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (257342 bytes in 46 ms).
2018-03-19 23:10:14,687 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1521481214687
2018-03-19 23:10:14,786 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 (271359 bytes in 98 ms).
2018-03-19 23:10:15,688 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1521481215687
2018-03-19 23:10:15,780 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 (285375 bytes in 91 ms).
2018-03-19 23:10:16,687 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 5 @ 1521481216687
2018-03-19 23:10:16,764 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 5 (299392 bytes in 76 ms).
----------------------------------------------------------------------------------
I did not get "Asynchronous RocksDB snapshot ..." kind of message in the logs. Even if I have changed the state backend properties in the flink-conf.yaml file the log message remained the same. I think there is some issue with detecting the correct state back end.

Regarding the following sentence,
------------------------------------------------------------------------------------
Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.

------------------------------------------------------------------------------------
I have used the average checkpoint size in the charts which was obtained through the Flink dashboard. I hope the values in the Flink dashboard shows the holistic accurate view of the checkpoint sizes. If not, could you please explain how to measure the size of an incremental checkpoint in Flink?


Thanks,
Miyuru

On Monday, 19 March 2018, 19:46:36 GMT+5:30, Nico Kruber <[hidden email]> wrote:


Hi Miyuru,
Indeed, the behaviour you observed sounds strange and kind of go against
the results Stefan presented in [1]. To see what is going on, can you
also share your changes to Flink's configuration, i.e. flink-conf.yaml?

Let's first make sure you're really comparing RocksDBStateBackend with
vs without incremental checkpoints:
- if you remove this from the code:
    env.setStateBackend(new RocksDBStateBackend(
          new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
          true));
then you will end up with the state backend configured via the
"state.backend" property. Was this set to "rocksdb"? Alternatively, you
can set the second parameter to the RocksDBStateBackend constructor to
false to get the right back-end.

You can also verify the values you see from the web interface by looking
into the logs (at INFO level). There, you should see reports like this:
"Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
in thread ... took ... ms."

Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.


Let's get started with this and see whether there is anything unusual.


Regards,
Nico


[1]
https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/

On 19/03/18 05:25, Miyuru Dayarathna wrote:

> Hi,
>
> We did a performance test of Flink's incremental checkpointing to
> measure the average time it takes to create a checkpoint and the average
> checkpoint file size. We did this test on a single computer in order to
> avoid the latencies introduced by network communication. The computer
> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
> 101GB free SSD space. The computer was running on Ubuntu 16.04
> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
> test was run for 40 minutes.
>
> The Flink application we used is as follows,
> //---------------------------------------------------------------------------------------------------------
> public class LengthWindowIncrementalCheckpointing {
>     private static DataStream<Tuple4<String, Float, Integer, String>>
> inputStream = null;
>     private static final int PARALLELISM = 1;
>     private static final int timeoutMillis = 10;
>     private static final int WINDOWLENGTH = 10000;
>     private static final int SLIDELENGTH = 1;
>     private static Logger logger =
> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
>
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         // start a checkpoint every 1000 ms
>         env.enableCheckpointing(1000);
>         try {
>             env.setStateBackend(new RocksDBStateBackend(
>                     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>                     true));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>
>         env.setBufferTimeout(timeoutMillis);
>         inputStream = env.addSource(new
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
>
>         DataStream<Tuple4<String, Float, Integer, String>>
> incrementStream2 =
>                 inputStream.filter(new FilterFunction<Tuple4<String,
> Float, Integer, String>>() {
>                     @Override
>                     public boolean filter(Tuple4<String, Float, Integer,
> String> tuple) throws Exception {
>                         if (tuple.f1 > 10) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>         incrementStream2.writeUsingOutputFormat(new
> DiscardingOutputFormat<Tuple4<String, Float, Integer,
>                 String>>());
>
>         try {
>             env.execute("Flink application.");
>         } catch (Exception e) {
>             logger.error("Error in starting the Flink stream
> application: " + e.getMessage(), e);
>         }
>     }
> }
>
> //---------------------------------------------------------------------------------------------------------
>
> I have attached two charts (Average_latencies.jpg and
> Average_state_sizes.jpg) with the results and another image with the
> Flink dashboard (Flink-Dashboard.png). The average state size chart
> indicates that the size of an incremental checkpoint is smaller than a
> full (i.e., complete) checkpoint. This is the expected behavior from any
> incremental checkpointing system since the incremental checkpoint just
> stores the delta change. However, the average latency chart indicates
> that the average latency for taking an incremental checkpoint from Flink
> is larger than taking a complete (i.e., Full) checkpoint. Is this the
> expected behavior? I have highlighted the two fields in the
> "Flink-Dashboard.png" which we used as the fields for the average
> latency and the average state size. Note that to convert the incremental
> checkpointing application to full checkpointing application we just
> commented out the following lines in the above code.
>
>         try {
>             env.setStateBackend(new RocksDBStateBackend(
>                     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>                     true));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>
> Thanks,
> Miyuru

incrementalcheckpoints.zip (96K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Incremental checkpointing performance

Nico Kruber
Hi Miyuru,
regarding "state.backend", I was looking at version 1.5 docs and some
things changed compared to 1.3. The "Asynchronous RocksDB snapshot ..."
messages only occur with full snapshots, i.e. non-incremental, and I
verified this for your program as well.

There are some issues with your project though:
1) your Flink dependencies should all have the same version
2) your source does not acquire the checkpoint lock before emitting
events (see the docs around the SourceFunction you are implementing)


Regarding the checkpoint sizes: you can rely on the web interface
reporting correct metrics. However, the "average" values may not be too
much useful for you since you are using a sliding count window and thus
during ramp-up (until you get your 10000 windows of the slide size) you
will have smaller states than after than. Since you only have 2 keys,
you will eventually have 20000 window states to store and from then on
stay with this number. So rather look at the "History" column of the web
interface or into the JobManager log.


Regarding the original issue: I was recently made aware of another thing
which may influence the speed of an incremental snapshot: if done
incrementally, we need to close and flush RocksDB's sst file so that it
continues with a new file and we can hardlink and copy a consistent
snapshot. For full snapshots, we simple iterate over all items to copy.
Now this close-and-flush may be more costly (hence the higher duration)
and since this cannot be done asynchronously (as a full snapshot) we
also may not process as many records.
-> Therefore, you probably did not run your program long enough to
create the full set of windows and I'm guessing, you will eventually get
to the same checkpoint sizes.


TLDR; incremental snapshots are worth only (and are designed for...) if
you have a lot of operator state (not just a few MB!) while only few
parts are actually changing between checkpoints. In these scenarios, the
added latency for transferring such a snapshot to the checkpoint store
over network would cover the additional cost during snapshot creation.


Nico


On 21/03/18 06:01, Miyuru Dayarathna wrote:

> Hi,
>
> Since we could not observe log messages such as "Asynchronous RocksDB
> snapshot" in the Flink's log files, we ran the application with Flink
> 1.3.3 as well. But it also did not print the log message. Hence I am
> wondering whether we ran Flink's incremental checkpointing in the
> correct manner. I have attached the complete application with this
> email. Could you please run this in your setup and let me know whether
> you get the incremental checkpoint related logs printed in your Flink setup?
>
> Thanks,
> Miyuru
>
>
>
>
> On Monday, 19 March 2018, 23:33:37 GMT+5:30, Miyuru Dayarathna
> <[hidden email]> wrote:
>
>
> Hi Nico,
>
> Thanks for the detailed explanation. The only change I have made in my
> flink-conf.yaml file is the following.
>
> state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb
>
> The default "state.backend" value is set to filesystem. Removing the
> env.setStateBackend() method code or changing the "state.backend"
> property to rocksdb does not change the state backend to RocksDB. I got
> this verified by looking at the Flink log files. I have mentioned a
> sample of the log file for your reference.
>
> -------------------------------------------------------
> carbon-5th:38631/user/taskmanager) as 1ac63dfb481eab3d3165a965084115f3.
> Current number of registered hosts is 1. Current number of alive task
> slots is 1.
> 2018-03-19 23:10:11,606 INFO 
> org.apache.flink.runtime.client.JobClient                     - Checking
> and uploading JAR files
> 2018-03-19 23:10:11,618 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
> 2018-03-19 23:10:11,623 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Using
> restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=10000) for 7c19a14f4e75149ffaa064fac7e2bf29.
> 2018-03-19 23:10:11,636 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> recovers via failover strategy: full graph restart
> 2018-03-19 23:10:11,648 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Running
> initialization on master for job Flink application.
> (7c19a14f4e75149ffaa064fac7e2bf29).
> 2018-03-19 23:10:11,648 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Successfully ran initialization on master in 0 ms.
> 2018-03-19 23:10:11,664 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Using
> application-defined state backend for checkpoint/savepoint metadata:
> RocksDB State Backend {isInitialized=false, configuredDbBasePaths=null,
> initializedDbBasePaths=null, checkpointStreamBackend=File State Backend
> @ file:/home/ubuntu/tmp-flink-rocksdb}.
> 2018-03-19 23:10:11,685 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Scheduling job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
> 2018-03-19 23:10:11,685 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> Flink application. (7c19a14f4e75149ffaa064fac7e2bf29) switched from
> state CREATED to RUNNING.
> 2018-03-19 23:10:11,692 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from CREATED to SCHEDULED.
> 2018-03-19 23:10:11,698 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
> CountTrigger(1),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a,
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from CREATED to SCHEDULED.
> 2018-03-19 23:10:11,706 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from SCHEDULED to DEPLOYING.
> 2018-03-19 23:10:11,707 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying Source: inputStream -> Filter (1/1) (attempt #0) to computer1
> 2018-03-19 23:10:11,712 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
> CountTrigger(1),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a,
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from SCHEDULED to DEPLOYING.
> 2018-03-19 23:10:11,712 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
> CountTrigger(1),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a,
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (attempt #0) to computer1
> 2018-03-19 23:10:12,004 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from DEPLOYING to RUNNING.
> 2018-03-19 23:10:12,011 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
> CountTrigger(1),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a,
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from DEPLOYING to RUNNING.
> 2018-03-19 23:10:12,695 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1 @ 1521481212687
> 2018-03-19 23:10:12,844 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 1 (244193 bytes in 155 ms).
> 2018-03-19 23:10:13,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 2 @ 1521481213687
> 2018-03-19 23:10:13,744 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 2 (257342 bytes in 46 ms).
> 2018-03-19 23:10:14,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 3 @ 1521481214687
> 2018-03-19 23:10:14,786 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 3 (271359 bytes in 98 ms).
> 2018-03-19 23:10:15,688 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 4 @ 1521481215687
> 2018-03-19 23:10:15,780 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 4 (285375 bytes in 91 ms).
> 2018-03-19 23:10:16,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 5 @ 1521481216687
> 2018-03-19 23:10:16,764 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 5 (299392 bytes in 76 ms).
> ----------------------------------------------------------------------------------
> I did not get "Asynchronous RocksDB snapshot ..." kind of message in the
> logs. Even if I have changed the state backend properties in the
> flink-conf.yaml file the log message remained the same. I think there is
> some issue with detecting the correct state back end.
>
> Regarding the following sentence,
> ------------------------------------------------------------------------------------
> Other than that, from what I know about it (Stefan (cc'd), correct me if
> I'm wrong), incremental checkpoints only do hard links locally to the
> changed sst files and then copy the data in there to the checkpoint
> store (the path you gave). A full checkpoint must copy all current data.
> If, between two checkpoints, you write more data than the contents of
> the database, e.g. by updating a key multiple times, you may indeed have
> more data to store. Judging from the state sizes you gave, this is
> probably not the case.
>
> ------------------------------------------------------------------------------------
> I have used the average checkpoint size in the charts which was obtained
> through the Flink dashboard. I hope the values in the Flink dashboard
> shows the holistic accurate view of the checkpoint sizes. If not, could
> you please explain how to measure the size of an incremental checkpoint
> in Flink?
>
>
> Thanks,
> Miyuru
>
> On Monday, 19 March 2018, 19:46:36 GMT+5:30, Nico Kruber
> <[hidden email]> wrote:
>
>
> Hi Miyuru,
> Indeed, the behaviour you observed sounds strange and kind of go against
> the results Stefan presented in [1]. To see what is going on, can you
> also share your changes to Flink's configuration, i.e. flink-conf.yaml?
>
> Let's first make sure you're really comparing RocksDBStateBackend with
> vs without incremental checkpoints:
> - if you remove this from the code:
>     env.setStateBackend(new RocksDBStateBackend(
>           new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>           true));
> then you will end up with the state backend configured via the
> "state.backend" property. Was this set to "rocksdb"? Alternatively, you
> can set the second parameter to the RocksDBStateBackend constructor to
> false to get the right back-end.
>
> You can also verify the values you see from the web interface by looking
> into the logs (at INFO level). There, you should see reports like this:
> "Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
> took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
> in thread ... took ... ms."
>
> Other than that, from what I know about it (Stefan (cc'd), correct me if
> I'm wrong), incremental checkpoints only do hard links locally to the
> changed sst files and then copy the data in there to the checkpoint
> store (the path you gave). A full checkpoint must copy all current data.
> If, between two checkpoints, you write more data than the contents of
> the database, e.g. by updating a key multiple times, you may indeed have
> more data to store. Judging from the state sizes you gave, this is
> probably not the case.
>
>
> Let's get started with this and see whether there is anything unusual.
>
>
> Regards,
> Nico
>
>
> [1]
> https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/
>
> On 19/03/18 05:25, Miyuru Dayarathna wrote:
>> Hi,
>>
>> We did a performance test of Flink's incremental checkpointing to
>> measure the average time it takes to create a checkpoint and the average
>> checkpoint file size. We did this test on a single computer in order to
>> avoid the latencies introduced by network communication. The computer
>> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
>> 101GB free SSD space. The computer was running on Ubuntu 16.04
>> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
>> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
>> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
>> test was run for 40 minutes.
>>
>> The Flink application we used is as follows,
>>
> //---------------------------------------------------------------------------------------------------------
>> public class LengthWindowIncrementalCheckpointing {
>>     private static DataStream<Tuple4<String, Float, Integer, String>>
>> inputStream = null;
>>     private static final int PARALLELISM = 1;
>>     private static final int timeoutMillis = 10;
>>     private static final int WINDOWLENGTH = 10000;
>>     private static final int SLIDELENGTH = 1;
>>     private static Logger logger =
>> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
>>
>>     public static void main(String[] args) {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         // start a checkpoint every 1000 ms
>>         env.enableCheckpointing(1000);
>>         try {
>>             env.setStateBackend(new RocksDBStateBackend(
>>                     new
>> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>>                     true));
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>         }
>>
>>         env.setBufferTimeout(timeoutMillis);
>>         inputStream = env.addSource(new
>>
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
>>
>>         DataStream<Tuple4<String, Float, Integer, String>>
>> incrementStream2 =
>>                 inputStream.filter(new FilterFunction<Tuple4<String,
>> Float, Integer, String>>() {
>>                     @Override
>>                     public boolean filter(Tuple4<String, Float, Integer,
>> String> tuple) throws Exception {
>>                         if (tuple.f1 > 10) {
>>                             return true;
>>                         }
>>                         return false;
>>                     }
>>                 }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>>         incrementStream2.writeUsingOutputFormat(new
>> DiscardingOutputFormat<Tuple4<String, Float, Integer,
>>                 String>>());
>>
>>         try {
>>             env.execute("Flink application.");
>>         } catch (Exception e) {
>>             logger.error("Error in starting the Flink stream
>> application: " + e.getMessage(), e);
>>         }
>>     }
>> }
>>
>>
> //---------------------------------------------------------------------------------------------------------
>>
>> I have attached two charts (Average_latencies.jpg and
>> Average_state_sizes.jpg) with the results and another image with the
>> Flink dashboard (Flink-Dashboard.png). The average state size chart
>> indicates that the size of an incremental checkpoint is smaller than a
>> full (i.e., complete) checkpoint. This is the expected behavior from any
>> incremental checkpointing system since the incremental checkpoint just
>> stores the delta change. However, the average latency chart indicates
>> that the average latency for taking an incremental checkpoint from Flink
>> is larger than taking a complete (i.e., Full) checkpoint. Is this the
>> expected behavior? I have highlighted the two fields in the
>> "Flink-Dashboard.png" which we used as the fields for the average
>> latency and the average state size. Note that to convert the incremental
>> checkpointing application to full checkpointing application we just
>> commented out the following lines in the above code.
>>
>>         try {
>>             env.setStateBackend(new RocksDBStateBackend(
>>                     new
>> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>>                     true));
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>         }
>>
>> Thanks,
>> Miyuru
--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Incremental checkpointing performance

Miyuru Dayarathna
Hi Nico,

Thanks for the detailed explanation. I corrected the two issues you mentioned on my application and I was able to observe the behavior you mentioned with Flink 1.4.1. As you said the "Asynchronous RocksDB snapshot ..." message appears only for full snapshots. The incremental snapshot version of the application does not print that message. It should be noted that these logs appear only on the taskmanager log file.

Based on the description you provided on the checkpoint sizes, I think it is better to lookup the JobManager log file for the entries such as "Completed checkpoint 25 (290061 bytes in 60 ms)" and then start calculating the average latencies for taking a checkpoint after the window has been completely filled (i.e., after the application reaches the steady state).

Thanks for the explanation on the reason why it takes more time to take an incremental checkpoint compared to taking a full checkpoint in this setup. As you said there should be considerable overhead of flushing the data and closing the RocksDB sst file. Hence, repeatedly doing this with few MBs of state should have resulted in larger average elapsed time for taking incremental snapshots compared to full snapshots. Also As you said, I will double check whether the full windows were created before I measure the checkpoint sizes and checkpoint durations.

Thanks,
Miyuru

On Friday, 23 March 2018, 21:56:02 GMT+5:30, Nico Kruber <[hidden email]> wrote:


Hi Miyuru,
regarding "state.backend", I was looking at version 1.5 docs and some
things changed compared to 1.3. The "Asynchronous RocksDB snapshot ..."
messages only occur with full snapshots, i.e. non-incremental, and I
verified this for your program as well.

There are some issues with your project though:
1) your Flink dependencies should all have the same version
2) your source does not acquire the checkpoint lock before emitting
events (see the docs around the SourceFunction you are implementing)


Regarding the checkpoint sizes: you can rely on the web interface
reporting correct metrics. However, the "average" values may not be too
much useful for you since you are using a sliding count window and thus
during ramp-up (until you get your 10000 windows of the slide size) you
will have smaller states than after than. Since you only have 2 keys,
you will eventually have 20000 window states to store and from then on
stay with this number. So rather look at the "History" column of the web
interface or into the JobManager log.


Regarding the original issue: I was recently made aware of another thing
which may influence the speed of an incremental snapshot: if done
incrementally, we need to close and flush RocksDB's sst file so that it
continues with a new file and we can hardlink and copy a consistent
snapshot. For full snapshots, we simple iterate over all items to copy.
Now this close-and-flush may be more costly (hence the higher duration)
and since this cannot be done asynchronously (as a full snapshot) we
also may not process as many records.
-> Therefore, you probably did not run your program long enough to
create the full set of windows and I'm guessing, you will eventually get
to the same checkpoint sizes.


TLDR; incremental snapshots are worth only (and are designed for...) if
you have a lot of operator state (not just a few MB!) while only few
parts are actually changing between checkpoints. In these scenarios, the
added latency for transferring such a snapshot to the checkpoint store
over network would cover the additional cost during snapshot creation.


Nico


On 21/03/18 06:01, Miyuru Dayarathna wrote:

> Hi,
>
> Since we could not observe log messages such as "Asynchronous RocksDB
> snapshot" in the Flink's log files, we ran the application with Flink
> 1.3.3 as well. But it also did not print the log message. Hence I am
> wondering whether we ran Flink's incremental checkpointing in the
> correct manner. I have attached the complete application with this
> email. Could you please run this in your setup and let me know whether
> you get the incremental checkpoint related logs printed in your Flink setup?
>
> Thanks,
> Miyuru
>
>
>
>
> On Monday, 19 March 2018, 23:33:37 GMT+5:30, Miyuru Dayarathna
> <[hidden email]> wrote:
>
>
> Hi Nico,
>
> Thanks for the detailed explanation. The only change I have made in my
> flink-conf.yaml file is the following.
>
> state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb
>
> The default "state.backend" value is set to filesystem. Removing the
> env.setStateBackend() method code or changing the "state.backend"
> property to rocksdb does not change the state backend to RocksDB. I got
> this verified by looking at the Flink log files. I have mentioned a
> sample of the log file for your reference.
>
> -------------------------------------------------------
> carbon-5th:38631/user/taskmanager) as 1ac63dfb481eab3d3165a965084115f3.
> Current number of registered hosts is 1. Current number of alive task
> slots is 1.
> 2018-03-19 23:10:11,606 INFO 
> org.apache.flink.runtime.client.JobClient                     - Checking
> and uploading JAR files
> 2018-03-19 23:10:11,618 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
> 2018-03-19 23:10:11,623 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Using
> restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=10000) for 7c19a14f4e75149ffaa064fac7e2bf29.
> 2018-03-19 23:10:11,636 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> recovers via failover strategy: full graph restart
> 2018-03-19 23:10:11,648 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Running
> initialization on master for job Flink application.
> (7c19a14f4e75149ffaa064fac7e2bf29).
> 2018-03-19 23:10:11,648 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Successfully ran initialization on master in 0 ms.
> 2018-03-19 23:10:11,664 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Using
> application-defined state backend for checkpoint/savepoint metadata:
> RocksDB State Backend {isInitialized=false, configuredDbBasePaths=null,
> initializedDbBasePaths=null, checkpointStreamBackend=File State Backend
> @ file:/home/ubuntu/tmp-flink-rocksdb}.
> 2018-03-19 23:10:11,685 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Scheduling job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
> 2018-03-19 23:10:11,685 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> Flink application. (7c19a14f4e75149ffaa064fac7e2bf29) switched from
> state CREATED to RUNNING.
> 2018-03-19 23:10:11,692 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from CREATED to SCHEDULED.
> 2018-03-19 23:10:11,698 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=[hidden email]},
> CountTrigger(1),
> [hidden email],
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from CREATED to SCHEDULED.
> 2018-03-19 23:10:11,706 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from SCHEDULED to DEPLOYING.
> 2018-03-19 23:10:11,707 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying Source: inputStream -> Filter (1/1) (attempt #0) to computer1
> 2018-03-19 23:10:11,712 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=[hidden email]},
> CountTrigger(1),
> [hidden email],
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from SCHEDULED to DEPLOYING.
> 2018-03-19 23:10:11,712 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=[hidden email]},
> CountTrigger(1),
> [hidden email],
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (attempt #0) to computer1
> 2018-03-19 23:10:12,004 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from DEPLOYING to RUNNING.
> 2018-03-19 23:10:12,011 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=[hidden email]},
> CountTrigger(1),
> [hidden email],
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from DEPLOYING to RUNNING.
> 2018-03-19 23:10:12,695 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1 @ 1521481212687
> 2018-03-19 23:10:12,844 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 1 (244193 bytes in 155 ms).
> 2018-03-19 23:10:13,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 2 @ 1521481213687
> 2018-03-19 23:10:13,744 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 2 (257342 bytes in 46 ms).
> 2018-03-19 23:10:14,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 3 @ 1521481214687
> 2018-03-19 23:10:14,786 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 3 (271359 bytes in 98 ms).
> 2018-03-19 23:10:15,688 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 4 @ 1521481215687
> 2018-03-19 23:10:15,780 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 4 (285375 bytes in 91 ms).
> 2018-03-19 23:10:16,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 5 @ 1521481216687
> 2018-03-19 23:10:16,764 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 5 (299392 bytes in 76 ms).
> ----------------------------------------------------------------------------------
> I did not get "Asynchronous RocksDB snapshot ..." kind of message in the
> logs. Even if I have changed the state backend properties in the
> flink-conf.yaml file the log message remained the same. I think there is
> some issue with detecting the correct state back end.
>
> Regarding the following sentence,
> ------------------------------------------------------------------------------------
> Other than that, from what I know about it (Stefan (cc'd), correct me if
> I'm wrong), incremental checkpoints only do hard links locally to the
> changed sst files and then copy the data in there to the checkpoint
> store (the path you gave). A full checkpoint must copy all current data.
> If, between two checkpoints, you write more data than the contents of
> the database, e.g. by updating a key multiple times, you may indeed have
> more data to store. Judging from the state sizes you gave, this is
> probably not the case.
>
> ------------------------------------------------------------------------------------
> I have used the average checkpoint size in the charts which was obtained
> through the Flink dashboard. I hope the values in the Flink dashboard
> shows the holistic accurate view of the checkpoint sizes. If not, could
> you please explain how to measure the size of an incremental checkpoint
> in Flink?
>
>
> Thanks,
> Miyuru
>
> On Monday, 19 March 2018, 19:46:36 GMT+5:30, Nico Kruber
> <[hidden email]> wrote:
>
>
> Hi Miyuru,
> Indeed, the behaviour you observed sounds strange and kind of go against
> the results Stefan presented in [1]. To see what is going on, can you
> also share your changes to Flink's configuration, i.e. flink-conf.yaml?
>
> Let's first make sure you're really comparing RocksDBStateBackend with
> vs without incremental checkpoints:
> - if you remove this from the code:
>     env.setStateBackend(new RocksDBStateBackend(
>           new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>           true));
> then you will end up with the state backend configured via the
> "state.backend" property. Was this set to "rocksdb"? Alternatively, you
> can set the second parameter to the RocksDBStateBackend constructor to
> false to get the right back-end.
>
> You can also verify the values you see from the web interface by looking
> into the logs (at INFO level). There, you should see reports like this:
> "Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
> took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
> in thread ... took ... ms."
>
> Other than that, from what I know about it (Stefan (cc'd), correct me if
> I'm wrong), incremental checkpoints only do hard links locally to the
> changed sst files and then copy the data in there to the checkpoint
> store (the path you gave). A full checkpoint must copy all current data.
> If, between two checkpoints, you write more data than the contents of
> the database, e.g. by updating a key multiple times, you may indeed have
> more data to store. Judging from the state sizes you gave, this is
> probably not the case.
>
>
> Let's get started with this and see whether there is anything unusual.
>
>
> Regards,
> Nico
>
>
> [1]
> https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/
>
> On 19/03/18 05:25, Miyuru Dayarathna wrote:
>> Hi,
>>
>> We did a performance test of Flink's incremental checkpointing to
>> measure the average time it takes to create a checkpoint and the average
>> checkpoint file size. We did this test on a single computer in order to
>> avoid the latencies introduced by network communication. The computer
>> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
>> 101GB free SSD space. The computer was running on Ubuntu 16.04
>> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
>> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
>> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
>> test was run for 40 minutes.
>>
>> The Flink application we used is as follows,
>>
> //---------------------------------------------------------------------------------------------------------
>> public class LengthWindowIncrementalCheckpointing {
>>     private static DataStream<Tuple4<String, Float, Integer, String>>
>> inputStream = null;
>>     private static final int PARALLELISM = 1;
>>     private static final int timeoutMillis = 10;
>>     private static final int WINDOWLENGTH = 10000;
>>     private static final int SLIDELENGTH = 1;
>>     private static Logger logger =
>> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
>>
>>     public static void main(String[] args) {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         // start a checkpoint every 1000 ms
>>         env.enableCheckpointing(1000);
>>         try {
>>             env.setStateBackend(new RocksDBStateBackend(
>>                     new
>> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>>                     true));
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>         }
>>
>>         env.setBufferTimeout(timeoutMillis);
>>         inputStream = env.addSource(new
>>
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
>>
>>         DataStream<Tuple4<String, Float, Integer, String>>
>> incrementStream2 =
>>                 inputStream.filter(new FilterFunction<Tuple4<String,
>> Float, Integer, String>>() {
>>                     @Override
>>                     public boolean filter(Tuple4<String, Float, Integer,
>> String> tuple) throws Exception {
>>                         if (tuple.f1 > 10) {
>>                             return true;
>>                         }
>>                         return false;
>>                     }
>>                 }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>>         incrementStream2.writeUsingOutputFormat(new
>> DiscardingOutputFormat<Tuple4<String, Float, Integer,
>>                 String>>());
>>
>>         try {
>>             env.execute("Flink application.");
>>         } catch (Exception e) {
>>             logger.error("Error in starting the Flink stream
>> application: " + e.getMessage(), e);
>>         }
>>     }
>> }
>>
>>
> //---------------------------------------------------------------------------------------------------------
>>
>> I have attached two charts (Average_latencies.jpg and
>> Average_state_sizes.jpg) with the results and another image with the
>> Flink dashboard (Flink-Dashboard.png). The average state size chart
>> indicates that the size of an incremental checkpoint is smaller than a
>> full (i.e., complete) checkpoint. This is the expected behavior from any
>> incremental checkpointing system since the incremental checkpoint just
>> stores the delta change. However, the average latency chart indicates
>> that the average latency for taking an incremental checkpoint from Flink
>> is larger than taking a complete (i.e., Full) checkpoint. Is this the
>> expected behavior? I have highlighted the two fields in the
>> "Flink-Dashboard.png" which we used as the fields for the average
>> latency and the average state size. Note that to convert the incremental
>> checkpointing application to full checkpointing application we just
>> commented out the following lines in the above code.
>>
>>         try {
>>             env.setStateBackend(new RocksDBStateBackend(
>>                     new
>> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>>                     true));
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>         }
>>
>> Thanks,
>> Miyuru

--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: Incremental checkpointing performance

Stephan Ewen
In reply to this post by Nico Kruber
I think what happens is the following:

  - For full checkpoints, Flink iterates asynchronously over the data. That means the whole checkpoint is a compact asynchronous operation.

  - For incremental checkpoints, RocksDB has to flush the write buffer and create a new SSTable. That flush is synchronous, but should be very brief. Then there is an asynchronous materialization of the SSTables that are different from the previous checkpoint.

Because of that, you see that
  - Full checkpoints have a shorter synchronous duration than incremental checkpoints
  - For small state, full checkpoints may actually be faster end-to-end
  - For large state, the asynchronous part of incremental checkpoints should be faster, and with that, the end-to-end duration as well

Stephan

On Fri, Mar 23, 2018 at 5:25 PM, Nico Kruber <[hidden email]> wrote:
Hi Miyuru,
regarding "state.backend", I was looking at version 1.5 docs and some
things changed compared to 1.3. The "Asynchronous RocksDB snapshot ..."
messages only occur with full snapshots, i.e. non-incremental, and I
verified this for your program as well.

There are some issues with your project though:
1) your Flink dependencies should all have the same version
2) your source does not acquire the checkpoint lock before emitting
events (see the docs around the SourceFunction you are implementing)


Regarding the checkpoint sizes: you can rely on the web interface
reporting correct metrics. However, the "average" values may not be too
much useful for you since you are using a sliding count window and thus
during ramp-up (until you get your 10000 windows of the slide size) you
will have smaller states than after than. Since you only have 2 keys,
you will eventually have 20000 window states to store and from then on
stay with this number. So rather look at the "History" column of the web
interface or into the JobManager log.


Regarding the original issue: I was recently made aware of another thing
which may influence the speed of an incremental snapshot: if done
incrementally, we need to close and flush RocksDB's sst file so that it
continues with a new file and we can hardlink and copy a consistent
snapshot. For full snapshots, we simple iterate over all items to copy.
Now this close-and-flush may be more costly (hence the higher duration)
and since this cannot be done asynchronously (as a full snapshot) we
also may not process as many records.
-> Therefore, you probably did not run your program long enough to
create the full set of windows and I'm guessing, you will eventually get
to the same checkpoint sizes.


TLDR; incremental snapshots are worth only (and are designed for...) if
you have a lot of operator state (not just a few MB!) while only few
parts are actually changing between checkpoints. In these scenarios, the
added latency for transferring such a snapshot to the checkpoint store
over network would cover the additional cost during snapshot creation.


Nico


On 21/03/18 06:01, Miyuru Dayarathna wrote:
> Hi,
>
> Since we could not observe log messages such as "Asynchronous RocksDB
> snapshot" in the Flink's log files, we ran the application with Flink
> 1.3.3 as well. But it also did not print the log message. Hence I am
> wondering whether we ran Flink's incremental checkpointing in the
> correct manner. I have attached the complete application with this
> email. Could you please run this in your setup and let me know whether
> you get the incremental checkpoint related logs printed in your Flink setup?
>
> Thanks,
> Miyuru
>
>
>
>
> On Monday, 19 March 2018, 23:33:37 GMT+5:30, Miyuru Dayarathna
> <[hidden email]> wrote:
>
>
> Hi Nico,
>
> Thanks for the detailed explanation. The only change I have made in my
> flink-conf.yaml file is the following.
>
> state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb
>
> The default "state.backend" value is set to filesystem. Removing the
> env.setStateBackend() method code or changing the "state.backend"
> property to rocksdb does not change the state backend to RocksDB. I got
> this verified by looking at the Flink log files. I have mentioned a
> sample of the log file for your reference.
>
> -------------------------------------------------------
> carbon-5th:38631/user/taskmanager) as 1ac63dfb481eab3d3165a965084115f3.
> Current number of registered hosts is 1. Current number of alive task
> slots is 1.
> 2018-03-19 23:10:11,606 INFO 
> org.apache.flink.runtime.client.JobClient                     - Checking
> and uploading JAR files
> 2018-03-19 23:10:11,618 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
> 2018-03-19 23:10:11,623 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Using
> restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=10000) for 7c19a14f4e75149ffaa064fac7e2bf29.
> 2018-03-19 23:10:11,636 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> recovers via failover strategy: full graph restart
> 2018-03-19 23:10:11,648 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Running
> initialization on master for job Flink application.
> (7c19a14f4e75149ffaa064fac7e2bf29).
> 2018-03-19 23:10:11,648 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Successfully ran initialization on master in 0 ms.
> 2018-03-19 23:10:11,664 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                - Using
> application-defined state backend for checkpoint/savepoint metadata:
> RocksDB State Backend {isInitialized=false, configuredDbBasePaths=null,
> initializedDbBasePaths=null, checkpointStreamBackend=File State Backend
> @ file:/home/ubuntu/tmp-flink-rocksdb}.
> 2018-03-19 23:10:11,685 INFO 
> org.apache.flink.runtime.jobmanager.JobManager                -
> Scheduling job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
> 2018-03-19 23:10:11,685 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> Flink application. (7c19a14f4e75149ffaa064fac7e2bf29) switched from
> state CREATED to RUNNING.
> 2018-03-19 23:10:11,692 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from CREATED to SCHEDULED.
> 2018-03-19 23:10:11,698 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
> CountTrigger(1),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a,
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from CREATED to SCHEDULED.
> 2018-03-19 23:10:11,706 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from SCHEDULED to DEPLOYING.
> 2018-03-19 23:10:11,707 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying Source: inputStream -> Filter (1/1) (attempt #0) to computer1
> 2018-03-19 23:10:11,712 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
> CountTrigger(1),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a,
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from SCHEDULED to DEPLOYING.
> 2018-03-19 23:10:11,712 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
> CountTrigger(1),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a,
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (attempt #0) to computer1
> 2018-03-19 23:10:12,004 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched
> from DEPLOYING to RUNNING.
> 2018-03-19 23:10:12,011 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
> CountTrigger(1),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a,
> WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1)
> (796fcd9c38c87b6efb6f512e78e626e9) switched from DEPLOYING to RUNNING.
> 2018-03-19 23:10:12,695 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1 @ 1521481212687
> 2018-03-19 23:10:12,844 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 1 (244193 bytes in 155 ms).
> 2018-03-19 23:10:13,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 2 @ 1521481213687
> 2018-03-19 23:10:13,744 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 2 (257342 bytes in 46 ms).
> 2018-03-19 23:10:14,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 3 @ 1521481214687
> 2018-03-19 23:10:14,786 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 3 (271359 bytes in 98 ms).
> 2018-03-19 23:10:15,688 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 4 @ 1521481215687
> 2018-03-19 23:10:15,780 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 4 (285375 bytes in 91 ms).
> 2018-03-19 23:10:16,687 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 5 @ 1521481216687
> 2018-03-19 23:10:16,764 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 5 (299392 bytes in 76 ms).
> ----------------------------------------------------------------------------------
> I did not get "Asynchronous RocksDB snapshot ..." kind of message in the
> logs. Even if I have changed the state backend properties in the
> flink-conf.yaml file the log message remained the same. I think there is
> some issue with detecting the correct state back end.
>
> Regarding the following sentence,
> ------------------------------------------------------------------------------------
> Other than that, from what I know about it (Stefan (cc'd), correct me if
> I'm wrong), incremental checkpoints only do hard links locally to the
> changed sst files and then copy the data in there to the checkpoint
> store (the path you gave). A full checkpoint must copy all current data.
> If, between two checkpoints, you write more data than the contents of
> the database, e.g. by updating a key multiple times, you may indeed have
> more data to store. Judging from the state sizes you gave, this is
> probably not the case.
>
> ------------------------------------------------------------------------------------
> I have used the average checkpoint size in the charts which was obtained
> through the Flink dashboard. I hope the values in the Flink dashboard
> shows the holistic accurate view of the checkpoint sizes. If not, could
> you please explain how to measure the size of an incremental checkpoint
> in Flink?
>
>
> Thanks,
> Miyuru
>
> On Monday, 19 March 2018, 19:46:36 GMT+5:30, Nico Kruber
> <[hidden email]> wrote:
>
>
> Hi Miyuru,
> Indeed, the behaviour you observed sounds strange and kind of go against
> the results Stefan presented in [1]. To see what is going on, can you
> also share your changes to Flink's configuration, i.e. flink-conf.yaml?
>
> Let's first make sure you're really comparing RocksDBStateBackend with
> vs without incremental checkpoints:
> - if you remove this from the code:
>     env.setStateBackend(new RocksDBStateBackend(
>           new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>           true));
> then you will end up with the state backend configured via the
> "state.backend" property. Was this set to "rocksdb"? Alternatively, you
> can set the second parameter to the RocksDBStateBackend constructor to
> false to get the right back-end.
>
> You can also verify the values you see from the web interface by looking
> into the logs (at INFO level). There, you should see reports like this:
> "Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
> took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
> in thread ... took ... ms."
>
> Other than that, from what I know about it (Stefan (cc'd), correct me if
> I'm wrong), incremental checkpoints only do hard links locally to the
> changed sst files and then copy the data in there to the checkpoint
> store (the path you gave). A full checkpoint must copy all current data.
> If, between two checkpoints, you write more data than the contents of
> the database, e.g. by updating a key multiple times, you may indeed have
> more data to store. Judging from the state sizes you gave, this is
> probably not the case.
>
>
> Let's get started with this and see whether there is anything unusual.
>
>
> Regards,
> Nico
>
>
> [1]
> https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/
>
> On 19/03/18 05:25, Miyuru Dayarathna wrote:
>> Hi,
>>
>> We did a performance test of Flink's incremental checkpointing to
>> measure the average time it takes to create a checkpoint and the average
>> checkpoint file size. We did this test on a single computer in order to
>> avoid the latencies introduced by network communication. The computer
>> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
>> 101GB free SSD space. The computer was running on Ubuntu 16.04
>> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
>> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
>> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
>> test was run for 40 minutes.
>>
>> The Flink application we used is as follows,
>>
> //---------------------------------------------------------------------------------------------------------
>> public class LengthWindowIncrementalCheckpointing {
>>     private static DataStream<Tuple4<String, Float, Integer, String>>
>> inputStream = null;
>>     private static final int PARALLELISM = 1;
>>     private static final int timeoutMillis = 10;
>>     private static final int WINDOWLENGTH = 10000;
>>     private static final int SLIDELENGTH = 1;
>>     private static Logger logger =
>> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
>>
>>     public static void main(String[] args) {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         // start a checkpoint every 1000 ms
>>         env.enableCheckpointing(1000);
>>         try {
>>             env.setStateBackend(new RocksDBStateBackend(
>>                     new
>> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>>                     true));
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>         }
>>
>>         env.setBufferTimeout(timeoutMillis);
>>         inputStream = env.addSource(new
>>
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
>>
>>         DataStream<Tuple4<String, Float, Integer, String>>
>> incrementStream2 =
>>                 inputStream.filter(new FilterFunction<Tuple4<String,
>> Float, Integer, String>>() {
>>                     @Override
>>                     public boolean filter(Tuple4<String, Float, Integer,
>> String> tuple) throws Exception {
>>                         if (tuple.f1 > 10) {
>>                             return true;
>>                         }
>>                         return false;
>>                     }
>>                 }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>>         incrementStream2.writeUsingOutputFormat(new
>> DiscardingOutputFormat<Tuple4<String, Float, Integer,
>>                 String>>());
>>
>>         try {
>>             env.execute("Flink application.");
>>         } catch (Exception e) {
>>             logger.error("Error in starting the Flink stream
>> application: " + e.getMessage(), e);
>>         }
>>     }
>> }
>>
>>
> //---------------------------------------------------------------------------------------------------------
>>
>> I have attached two charts (Average_latencies.jpg and
>> Average_state_sizes.jpg) with the results and another image with the
>> Flink dashboard (Flink-Dashboard.png). The average state size chart
>> indicates that the size of an incremental checkpoint is smaller than a
>> full (i.e., complete) checkpoint. This is the expected behavior from any
>> incremental checkpointing system since the incremental checkpoint just
>> stores the delta change. However, the average latency chart indicates
>> that the average latency for taking an incremental checkpoint from Flink
>> is larger than taking a complete (i.e., Full) checkpoint. Is this the
>> expected behavior? I have highlighted the two fields in the
>> "Flink-Dashboard.png" which we used as the fields for the average
>> latency and the average state size. Note that to convert the incremental
>> checkpointing application to full checkpointing application we just
>> commented out the following lines in the above code.
>>
>>         try {
>>             env.setStateBackend(new RocksDBStateBackend(
>>                     new
>> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>>                     true));
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>         }
>>
>> Thanks,
>> Miyuru

--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen