Optimizing for super long checkpoint times

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Optimizing for super long checkpoint times

Rex Fenley
Hi,

We're running a job with on the order of >100GiB of state. For our initial run we wanted to keep things simple, so we allocated a single core node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4 disks on that machine). Overall, things are actually moving pretty smoothly, except for checkpointing. Checkpoints are set to be incremental, yet they're all in the range of 10-20 GiB -- we do have a lot of data being updated in real-time, retracts+appends -- and they take around 10-30 min. We have the Taskmanager to set to checkpoint every 5 min which means we're spending the majority of our time just checkpointing.

My question is, what sort of bottlenecks should we be investigating and what are some things we can try to improve our checkpoint times?

Some things we're considering are:
Increasing parallelism, hoping that this will partition the data and each operator will therefore checkpoint faster.
Changing time between checkpoints, though we don't have a good understanding of how this might affect total time.

Also, we are hesitant to use unaligned checkpointing at the moment and are hoping for some other options.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Optimizing for super long checkpoint times

Steven Wu
> things are actually moving pretty smoothly

Do you mean the job is otherwise healthy? like there is no lag etc. 

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <[hidden email]> wrote:
Hi,

We're running a job with on the order of >100GiB of state. For our initial run we wanted to keep things simple, so we allocated a single core node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4 disks on that machine). Overall, things are actually moving pretty smoothly, except for checkpointing. Checkpoints are set to be incremental, yet they're all in the range of 10-20 GiB -- we do have a lot of data being updated in real-time, retracts+appends -- and they take around 10-30 min. We have the Taskmanager to set to checkpoint every 5 min which means we're spending the majority of our time just checkpointing.

My question is, what sort of bottlenecks should we be investigating and what are some things we can try to improve our checkpoint times?

Some things we're considering are:
Increasing parallelism, hoping that this will partition the data and each operator will therefore checkpoint faster.
Changing time between checkpoints, though we don't have a good understanding of how this might affect total time.

Also, we are hesitant to use unaligned checkpointing at the moment and are hoping for some other options.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Optimizing for super long checkpoint times

Rex Fenley
As in, yes, other than checkpoint latency which is very long, everything is healthy with no lag.

So a few observations, it appears that everything is waiting on 1 join to finish during a checkpoint, most operators finish between 50ms and 2min.
The operator:
Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, user_id, owner_teacher_or_admin_archived_group_ids], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, owner_teacher_or_admin_archived_group_ids])

Also, while everything is waiting on that operator to finish, all our resources seem basically idle, and then at the end there's a big spike in network bytes out before operators go back to executing. You can see in the first graph an initial spike where every operator but 1 presumably does its checkpointing, a long idle time, then a final spike, then back to work. Other graphs included as well to demonstrate idle time + disk latency. Checkpoint starts at 16:56 and ends at 17:13:
Screen Shot 2020-12-12 at 5.15.36 PM.png
Screen Shot 2020-12-12 at 5.16.00 PM.png

Screen Shot 2020-12-12 at 5.16.34 PM.png

Screen Shot 2020-12-12 at 5.16.07 PM.png

On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <[hidden email]> wrote:
> things are actually moving pretty smoothly

Do you mean the job is otherwise healthy? like there is no lag etc. 

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <[hidden email]> wrote:
Hi,

We're running a job with on the order of >100GiB of state. For our initial run we wanted to keep things simple, so we allocated a single core node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4 disks on that machine). Overall, things are actually moving pretty smoothly, except for checkpointing. Checkpoints are set to be incremental, yet they're all in the range of 10-20 GiB -- we do have a lot of data being updated in real-time, retracts+appends -- and they take around 10-30 min. We have the Taskmanager to set to checkpoint every 5 min which means we're spending the majority of our time just checkpointing.

My question is, what sort of bottlenecks should we be investigating and what are some things we can try to improve our checkpoint times?

Some things we're considering are:
Increasing parallelism, hoping that this will partition the data and each operator will therefore checkpoint faster.
Changing time between checkpoints, though we don't have a good understanding of how this might affect total time.

Also, we are hesitant to use unaligned checkpointing at the moment and are hoping for some other options.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Optimizing for super long checkpoint times

Steven Wu
maybe do a thread dump while the checkpoint is in progress?

On Sat, Dec 12, 2020 at 5:23 PM Rex Fenley <[hidden email]> wrote:
As in, yes, other than checkpoint latency which is very long, everything is healthy with no lag.

So a few observations, it appears that everything is waiting on 1 join to finish during a checkpoint, most operators finish between 50ms and 2min.
The operator:
Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, user_id, owner_teacher_or_admin_archived_group_ids], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, owner_teacher_or_admin_archived_group_ids])

Also, while everything is waiting on that operator to finish, all our resources seem basically idle, and then at the end there's a big spike in network bytes out before operators go back to executing. You can see in the first graph an initial spike where every operator but 1 presumably does its checkpointing, a long idle time, then a final spike, then back to work. Other graphs included as well to demonstrate idle time + disk latency. Checkpoint starts at 16:56 and ends at 17:13:
Screen Shot 2020-12-12 at 5.15.36 PM.png
Screen Shot 2020-12-12 at 5.16.00 PM.png

Screen Shot 2020-12-12 at 5.16.34 PM.png

Screen Shot 2020-12-12 at 5.16.07 PM.png

On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <[hidden email]> wrote:
> things are actually moving pretty smoothly

Do you mean the job is otherwise healthy? like there is no lag etc. 

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <[hidden email]> wrote:
Hi,

We're running a job with on the order of >100GiB of state. For our initial run we wanted to keep things simple, so we allocated a single core node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4 disks on that machine). Overall, things are actually moving pretty smoothly, except for checkpointing. Checkpoints are set to be incremental, yet they're all in the range of 10-20 GiB -- we do have a lot of data being updated in real-time, retracts+appends -- and they take around 10-30 min. We have the Taskmanager to set to checkpoint every 5 min which means we're spending the majority of our time just checkpointing.

My question is, what sort of bottlenecks should we be investigating and what are some things we can try to improve our checkpoint times?

Some things we're considering are:
Increasing parallelism, hoping that this will partition the data and each operator will therefore checkpoint faster.
Changing time between checkpoints, though we don't have a good understanding of how this might affect total time.

Also, we are hesitant to use unaligned checkpointing at the moment and are hoping for some other options.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Optimizing for super long checkpoint times

Rex Fenley
I like that idea though I'm not sure the best way to go about that. Do you have any suggestions?

On Sun, Dec 13, 2020 at 11:06 AM Steven Wu <[hidden email]> wrote:
maybe do a thread dump while the checkpoint is in progress?

On Sat, Dec 12, 2020 at 5:23 PM Rex Fenley <[hidden email]> wrote:
As in, yes, other than checkpoint latency which is very long, everything is healthy with no lag.

So a few observations, it appears that everything is waiting on 1 join to finish during a checkpoint, most operators finish between 50ms and 2min.
The operator:
Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, user_id, owner_teacher_or_admin_archived_group_ids], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, owner_teacher_or_admin_archived_group_ids])

Also, while everything is waiting on that operator to finish, all our resources seem basically idle, and then at the end there's a big spike in network bytes out before operators go back to executing. You can see in the first graph an initial spike where every operator but 1 presumably does its checkpointing, a long idle time, then a final spike, then back to work. Other graphs included as well to demonstrate idle time + disk latency. Checkpoint starts at 16:56 and ends at 17:13:
Screen Shot 2020-12-12 at 5.15.36 PM.png
Screen Shot 2020-12-12 at 5.16.00 PM.png

Screen Shot 2020-12-12 at 5.16.34 PM.png

Screen Shot 2020-12-12 at 5.16.07 PM.png

On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <[hidden email]> wrote:
> things are actually moving pretty smoothly

Do you mean the job is otherwise healthy? like there is no lag etc. 

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <[hidden email]> wrote:
Hi,

We're running a job with on the order of >100GiB of state. For our initial run we wanted to keep things simple, so we allocated a single core node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4 disks on that machine). Overall, things are actually moving pretty smoothly, except for checkpointing. Checkpoints are set to be incremental, yet they're all in the range of 10-20 GiB -- we do have a lot of data being updated in real-time, retracts+appends -- and they take around 10-30 min. We have the Taskmanager to set to checkpoint every 5 min which means we're spending the majority of our time just checkpointing.

My question is, what sort of bottlenecks should we be investigating and what are some things we can try to improve our checkpoint times?

Some things we're considering are:
Increasing parallelism, hoping that this will partition the data and each operator will therefore checkpoint faster.
Changing time between checkpoints, though we don't have a good understanding of how this might affect total time.

Also, we are hesitant to use unaligned checkpointing at the moment and are hoping for some other options.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Optimizing for super long checkpoint times

Jaffe, Julian

If you’re using Flink 1.11 or later, you should be able to take thread dumps via the UI. If you have access to the machines that are running the actual Flink processes, you can use jcmd or the other various java CLI tools. If you’re using an earlier version of Flink and don’t have the necessary permissions on the physical hosts, you’ll have to add a little code to do this manually. Sample code cobbled together from stack overflow:

    /**

     * Print a thread dump and per-thread CPU usage information to the task manager log.

     */

    private void takeThreadDump() {

        StringBuffer threadDump = new StringBuffer(System.lineSeparator());

        long id = Thread.currentThread().getId();

        threadDump.append(

            String.format("******************************** Thread Dump in thread %d:\n\n", id));

        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();

        Map<Long, Long> usageMap = new HashMap<>();

        for (ThreadInfo threadInfo : threadMXBean.dumpAllThreads(true, true)) {

            usageMap.put(

                threadInfo.getThreadId(),

                threadMXBean.getThreadCpuTime(threadInfo.getThreadId())

            );

            threadDump.append(threadInfo.toString());

        }

        threadDump.append("\n\n");

        threadDump.append(

            usageMap

                .entrySet()

                .stream()

                .map(e -> e.getKey().toString() + ": " + e.getValue())

                .collect(Collectors.joining("\n"))

        );

        threadDump.append("\n\n********************************");

        LOG.info(threadDump.toString());

    }

 

 

Note that you won’t get nids with this approach. If you need more context in the stack traces, you’ll have to replace the threadInfo.toString() call with a separate function that works deeper into the stack. For instance, to bump the hard-coded max frames from 8 to 32:

 

    /**

     * Copied from ThreadInfo.toString() but with MAX_FRAMES bumped to 32.

     *

     * @param thread The ThreadInfo object to generate a stack trace for.

     * @return The stack trace for THREAD with up to 32 frames included.

     */

    private String longThreadStackTraceString(ThreadInfo thread) {

        StringBuilder sb = new StringBuilder("\"" + thread.getThreadName() + "\"" +

            " Id=" + thread.getThreadId() + " " +

            thread.getThreadState());

        if (thread.getLockName() != null) {

            sb.append(" on " + thread.getLockName());

        }

        if (thread.getLockOwnerName() != null) {

            sb.append(" owned by \"" + thread.getLockOwnerName() +

                "\" Id=" + thread.getLockOwnerId());

        }

        if (thread.isSuspended()) {

            sb.append(" (suspended)");

        }

        if (thread.isInNative()) {

            sb.append(" (in native)");

        }

        sb.append('\n');

        int i = 0;

        StackTraceElement[] stackTrace = thread.getStackTrace();

        for (; i < stackTrace.length && i < 32; i++) {

            StackTraceElement ste = stackTrace[i];

            sb.append("\tat " + ste.toString());

            sb.append('\n');

            if (i == 0 && thread.getLockInfo() != null) {

                Thread.State ts = thread.getThreadState();

                switch (ts) {

                    case BLOCKED:

                        sb.append("\t-  blocked on " + thread.getLockInfo());

                        sb.append('\n');

                        break;

                    case WAITING:

                    case TIMED_WAITING:

                        sb.append("\t-  waiting on " + thread.getLockInfo());

                        sb.append('\n');

                        break;

                    default:

                }

            }

 

            for (MonitorInfo mi : thread.getLockedMonitors()) {

                if (mi.getLockedStackDepth() == i) {

                    sb.append("\t-  locked " + mi);

                    sb.append('\n');

                }

            }

        }

        if (i < stackTrace.length) {

            sb.append("\t...");

            sb.append('\n');

        }

 

        LockInfo[] locks = thread.getLockedSynchronizers();

        if (locks.length > 0) {

            sb.append("\n\tNumber of locked synchronizers = " + locks.length);

            sb.append('\n');

            for (LockInfo li : locks) {

                sb.append("\t- " + li);

                sb.append('\n');

            }

        }

        sb.append('\n');

        return sb.toString();

    }

 

Then you can use longThreadStackTraceString(threadInfo) instead of threadInfo.toString().

 

 

If you have some smart way to trigger the thread dumps (e.g. you have a control channel in your flink app or you have some way of detecting a long-running checkpoint), use that approach to capture the thread dump. Otherwise, just set up a new scheduled thread pool and call takeThreadDump() every n minutes. Make sure to take the thread dump on whichever manager group is actually doing the work (likely the task managers)!

 

Julian

 

From: Rex Fenley <[hidden email]>
Date: Sunday, December 13, 2020 at 11:24 AM
To: Steven Wu <[hidden email]>
Cc: user <[hidden email]>, Brad Davis <[hidden email]>
Subject: Re: Optimizing for super long checkpoint times

 

I like that idea though I'm not sure the best way to go about that. Do you have any suggestions?

 

On Sun, Dec 13, 2020 at 11:06 AM Steven Wu <[hidden email]> wrote:

maybe do a thread dump while the checkpoint is in progress?

 

On Sat, Dec 12, 2020 at 5:23 PM Rex Fenley <[hidden email]> wrote:

As in, yes, other than checkpoint latency which is very long, everything is healthy with no lag.

 

So a few observations, it appears that everything is waiting on 1 join to finish during a checkpoint, most operators finish between 50ms and 2min.

The operator:

Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, user_id, owner_teacher_or_admin_archived_group_ids], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, owner_teacher_or_admin_archived_group_ids])

 

Also, while everything is waiting on that operator to finish, all our resources seem basically idle, and then at the end there's a big spike in network bytes out before operators go back to executing. You can see in the first graph an initial spike where every operator but 1 presumably does its checkpointing, a long idle time, then a final spike, then back to work. Other graphs included as well to demonstrate idle time + disk latency. Checkpoint starts at 16:56 and ends at 17:13:




 

On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <[hidden email]> wrote:

> things are actually moving pretty smoothly

 

Do you mean the job is otherwise healthy? like there is no lag etc. 

 

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

 

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <[hidden email]> wrote:

Hi,

 

We're running a job with on the order of >100GiB of state. For our initial run we wanted to keep things simple, so we allocated a single core node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4 disks on that machine). Overall, things are actually moving pretty smoothly, except for checkpointing. Checkpoints are set to be incremental, yet they're all in the range of 10-20 GiB -- we do have a lot of data being updated in real-time, retracts+appends -- and they take around 10-30 min. We have the Taskmanager to set to checkpoint every 5 min which means we're spending the majority of our time just checkpointing.

 

My question is, what sort of bottlenecks should we be investigating and what are some things we can try to improve our checkpoint times?

 

Some things we're considering are:

Increasing parallelism, hoping that this will partition the data and each operator will therefore checkpoint faster.

Changing time between checkpoints, though we don't have a good understanding of how this might affect total time.

 

Also, we are hesitant to use unaligned checkpointing at the moment and are hoping for some other options.

 

Thanks!


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Optimizing for super long checkpoint times

Rex Fenley
Ah wow, we're using 1.11.2 so I'll try out using the UI. If for whatever reason that doesn't work, I really appreciate you including some code to work off of.

Thanks!

On Mon, Dec 14, 2020 at 10:34 AM Jaffe, Julian <[hidden email]> wrote:

If you’re using Flink 1.11 or later, you should be able to take thread dumps via the UI. If you have access to the machines that are running the actual Flink processes, you can use jcmd or the other various java CLI tools. If you’re using an earlier version of Flink and don’t have the necessary permissions on the physical hosts, you’ll have to add a little code to do this manually. Sample code cobbled together from stack overflow:

    /**

     * Print a thread dump and per-thread CPU usage information to the task manager log.

     */

    private void takeThreadDump() {

        StringBuffer threadDump = new StringBuffer(System.lineSeparator());

        long id = Thread.currentThread().getId();

        threadDump.append(

            String.format("******************************** Thread Dump in thread %d:\n\n", id));

        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();

        Map<Long, Long> usageMap = new HashMap<>();

        for (ThreadInfo threadInfo : threadMXBean.dumpAllThreads(true, true)) {

            usageMap.put(

                threadInfo.getThreadId(),

                threadMXBean.getThreadCpuTime(threadInfo.getThreadId())

            );

            threadDump.append(threadInfo.toString());

        }

        threadDump.append("\n\n");

        threadDump.append(

            usageMap

                .entrySet()

                .stream()

                .map(e -> e.getKey().toString() + ": " + e.getValue())

                .collect(Collectors.joining("\n"))

        );

        threadDump.append("\n\n********************************");

        LOG.info(threadDump.toString());

    }

 

 

Note that you won’t get nids with this approach. If you need more context in the stack traces, you’ll have to replace the threadInfo.toString() call with a separate function that works deeper into the stack. For instance, to bump the hard-coded max frames from 8 to 32:

 

    /**

     * Copied from ThreadInfo.toString() but with MAX_FRAMES bumped to 32.

     *

     * @param thread The ThreadInfo object to generate a stack trace for.

     * @return The stack trace for THREAD with up to 32 frames included.

     */

    private String longThreadStackTraceString(ThreadInfo thread) {

        StringBuilder sb = new StringBuilder("\"" + thread.getThreadName() + "\"" +

            " Id=" + thread.getThreadId() + " " +

            thread.getThreadState());

        if (thread.getLockName() != null) {

            sb.append(" on " + thread.getLockName());

        }

        if (thread.getLockOwnerName() != null) {

            sb.append(" owned by \"" + thread.getLockOwnerName() +

                "\" Id=" + thread.getLockOwnerId());

        }

        if (thread.isSuspended()) {

            sb.append(" (suspended)");

        }

        if (thread.isInNative()) {

            sb.append(" (in native)");

        }

        sb.append('\n');

        int i = 0;

        StackTraceElement[] stackTrace = thread.getStackTrace();

        for (; i < stackTrace.length && i < 32; i++) {

            StackTraceElement ste = stackTrace[i];

            sb.append("\tat " + ste.toString());

            sb.append('\n');

            if (i == 0 && thread.getLockInfo() != null) {

                Thread.State ts = thread.getThreadState();

                switch (ts) {

                    case BLOCKED:

                        sb.append("\t-  blocked on " + thread.getLockInfo());

                        sb.append('\n');

                        break;

                    case WAITING:

                    case TIMED_WAITING:

                        sb.append("\t-  waiting on " + thread.getLockInfo());

                        sb.append('\n');

                        break;

                    default:

                }

            }

 

            for (MonitorInfo mi : thread.getLockedMonitors()) {

                if (mi.getLockedStackDepth() == i) {

                    sb.append("\t-  locked " + mi);

                    sb.append('\n');

                }

            }

        }

        if (i < stackTrace.length) {

            sb.append("\t...");

            sb.append('\n');

        }

 

        LockInfo[] locks = thread.getLockedSynchronizers();

        if (locks.length > 0) {

            sb.append("\n\tNumber of locked synchronizers = " + locks.length);

            sb.append('\n');

            for (LockInfo li : locks) {

                sb.append("\t- " + li);

                sb.append('\n');

            }

        }

        sb.append('\n');

        return sb.toString();

    }

 

Then you can use longThreadStackTraceString(threadInfo) instead of threadInfo.toString().

 

 

If you have some smart way to trigger the thread dumps (e.g. you have a control channel in your flink app or you have some way of detecting a long-running checkpoint), use that approach to capture the thread dump. Otherwise, just set up a new scheduled thread pool and call takeThreadDump() every n minutes. Make sure to take the thread dump on whichever manager group is actually doing the work (likely the task managers)!

 

Julian

 

From: Rex Fenley <[hidden email]>
Date: Sunday, December 13, 2020 at 11:24 AM
To: Steven Wu <[hidden email]>
Cc: user <[hidden email]>, Brad Davis <[hidden email]>
Subject: Re: Optimizing for super long checkpoint times

 

I like that idea though I'm not sure the best way to go about that. Do you have any suggestions?

 

On Sun, Dec 13, 2020 at 11:06 AM Steven Wu <[hidden email]> wrote:

maybe do a thread dump while the checkpoint is in progress?

 

On Sat, Dec 12, 2020 at 5:23 PM Rex Fenley <[hidden email]> wrote:

As in, yes, other than checkpoint latency which is very long, everything is healthy with no lag.

 

So a few observations, it appears that everything is waiting on 1 join to finish during a checkpoint, most operators finish between 50ms and 2min.

The operator:

Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, user_id, owner_teacher_or_admin_archived_group_ids], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, owner_teacher_or_admin_archived_group_ids])

 

Also, while everything is waiting on that operator to finish, all our resources seem basically idle, and then at the end there's a big spike in network bytes out before operators go back to executing. You can see in the first graph an initial spike where every operator but 1 presumably does its checkpointing, a long idle time, then a final spike, then back to work. Other graphs included as well to demonstrate idle time + disk latency. Checkpoint starts at 16:56 and ends at 17:13:




 

On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <[hidden email]> wrote:

> things are actually moving pretty smoothly

 

Do you mean the job is otherwise healthy? like there is no lag etc. 

 

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

 

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <[hidden email]> wrote:

Hi,

 

We're running a job with on the order of >100GiB of state. For our initial run we wanted to keep things simple, so we allocated a single core node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4 disks on that machine). Overall, things are actually moving pretty smoothly, except for checkpointing. Checkpoints are set to be incremental, yet they're all in the range of 10-20 GiB -- we do have a lot of data being updated in real-time, retracts+appends -- and they take around 10-30 min. We have the Taskmanager to set to checkpoint every 5 min which means we're spending the majority of our time just checkpointing.

 

My question is, what sort of bottlenecks should we be investigating and what are some things we can try to improve our checkpoint times?

 

Some things we're considering are:

Increasing parallelism, hoping that this will partition the data and each operator will therefore checkpoint faster.

Changing time between checkpoints, though we don't have a good understanding of how this might affect total time.

 

Also, we are hesitant to use unaligned checkpointing at the moment and are hoping for some other options.

 

Thanks!


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US