/*
 * Copyright (c) 2014-present MongoDB, Inc.
 * Copyright (c) 2008-2014 WiredTiger, Inc.
 *	All rights reserved.
 *
 * See the file LICENSE for redistribution information.
 */

#include "wt_internal.h"

/*
 * Compute the time in nanoseconds that must be reserved to represent a number of bytes in a
 * subsystem with a particular capacity per second.
 */
#define WT_RESERVATION_NS(bytes, capacity) (((bytes)*WT_BILLION) / (capacity))

/*
 * The fraction of a second's worth of capacity that will be stolen at a time. The number of bytes
 * this represents may be different for different subsystems, since each subsystem has its own
 * capacity per second.
 */
#define WT_STEAL_FRACTION(x) ((x) / 16)

/*
 * __capacity_config --
 *     Set I/O capacity configuration.
 */
static int
__capacity_config(WT_SESSION_IMPL *session, const char *cfg[])
{
    WT_CAPACITY *cap;
    WT_CONFIG_ITEM cval;
    WT_CONNECTION_IMPL *conn;
    uint64_t chunkcache, total;

    conn = S2C(session);
    chunkcache = total = 0;

    WT_RET(__wt_config_gets(session, cfg, "io_capacity.total", &cval));
    if (cval.val != 0) {
        if (cval.val < WT_THROTTLE_MIN)
            WT_RET_MSG(session, EINVAL, "total I/O capacity value %" PRId64 " below minimum %d",
              cval.val, WT_THROTTLE_MIN);
        total = (uint64_t)cval.val;
    }

    WT_RET(__wt_config_gets(session, cfg, "io_capacity.chunk_cache", &cval));
    if (cval.val != 0) {
        chunkcache = (uint64_t)cval.val;
        if (chunkcache < WT_THROTTLE_MIN)
            WT_RET_MSG(session, EINVAL,
              "chunk cache I/O capacity value %" PRIu64 " below minimum %d", chunkcache,
              WT_THROTTLE_MIN);
        if (total < chunkcache)
            WT_RET_MSG(session, EINVAL,
              "chunk cache I/O capacity value %" PRIu64 " below total %" PRIu64, chunkcache, total);
        if ((total - chunkcache) < WT_THROTTLE_MIN)
            WT_RET_MSG(session, EINVAL,
              "chunk cache I/O capacity value %" PRIu64
              " leaves insufficient capacity for other subsystems (total %" PRIu64
              ", remaining %" PRIu64 ")",
              chunkcache, total, total - chunkcache);
        total -= chunkcache;
    }

    cap = &conn->capacity;
    cap->chunkcache = chunkcache;
    __wt_atomic_store64(&cap->total, total);
    if (total != 0) {
        /*
         * We've been given a total capacity, set the capacity of all the subsystems.
         */
        cap->ckpt = WT_CAPACITY_SYS(total, WT_CAP_CKPT);
        cap->evict = WT_CAPACITY_SYS(total, WT_CAP_EVICT);
        cap->log = WT_CAPACITY_SYS(total, WT_CAP_LOG);
        cap->read = WT_CAPACITY_SYS(total, WT_CAP_READ);

        /*
         * Set the threshold to the percent of our capacity to periodically asynchronously flush
         * what we've written.
         */
        cap->threshold = ((cap->ckpt + cap->evict + cap->log) / 100) * WT_CAPACITY_PCT;
        if (cap->threshold < WT_CAPACITY_MIN_THRESHOLD)
            cap->threshold = WT_CAPACITY_MIN_THRESHOLD;
        WT_STAT_CONN_SET(session, capacity_threshold, cap->threshold);
    } else
        WT_STAT_CONN_SET(session, capacity_threshold, 0);

    if (chunkcache != 0)
        cap->chunkcache = chunkcache;

    return (0);
}

/*
 * __capacity_server_run_chk --
 *     Check to decide if the capacity server should continue running.
 */
static bool
__capacity_server_run_chk(WT_SESSION_IMPL *session)
{
    return (FLD_ISSET(S2C(session)->server_flags, WT_CONN_SERVER_CAPACITY));
}

/*
 * __capacity_server --
 *     The capacity server thread.
 */
static WT_THREAD_RET
__capacity_server(void *arg)
{
    WT_CAPACITY *cap;
    WT_CONNECTION_IMPL *conn;
    WT_DECL_RET;
    WT_SESSION_IMPL *session;
    uint64_t start, stop, time_ms;

    session = arg;
    conn = S2C(session);
    cap = &conn->capacity;
    for (;;) {
        /*
         * Wait until signalled but check once per second in case the signal was missed.
         */
        __wt_cond_wait(session, conn->capacity_cond, WT_MILLION, __capacity_server_run_chk);

        /* Check if we're quitting or being reconfigured. */
        if (!__capacity_server_run_chk(session))
            break;

        cap->signalled = false;
        if (cap->written < cap->threshold)
            continue;

        start = __wt_clock(session);
        WT_ERR(__wt_fsync_background(session));
        stop = __wt_clock(session);
        time_ms = WT_CLOCKDIFF_MS(stop, start);
        WT_STAT_CONN_SET(session, fsync_all_time, time_ms);
        cap->written = 0;
    }

    if (0) {
err:
        WT_IGNORE_RET(__wt_panic(session, ret, "capacity server error"));
    }
    return (WT_THREAD_RET_VALUE);
}

/*
 * __capacity_server_start --
 *     Start the capacity server thread.
 */
static int
__capacity_server_start(WT_CONNECTION_IMPL *conn)
{
    WT_SESSION_IMPL *session;

    FLD_SET(conn->server_flags, WT_CONN_SERVER_CAPACITY);

    /*
     * The capacity server gets its own session.
     */
    WT_RET(
      __wt_open_internal_session(conn, "capacity-server", false, 0, 0, &conn->capacity_session));
    session = conn->capacity_session;

    WT_RET(__wt_cond_alloc(session, "capacity server", &conn->capacity_cond));

    /*
     * Start the thread.
     */
    WT_RET(__wt_thread_create(session, &conn->capacity_tid, __capacity_server, session));
    conn->capacity_tid_set = true;

    return (0);
}

/*
 * __wti_capacity_server_create --
 *     Configure and start the capacity server.
 */
int
__wti_capacity_server_create(WT_SESSION_IMPL *session, const char *cfg[])
{
    WT_CONNECTION_IMPL *conn;

    conn = S2C(session);

    /*
     * Stop any server that is already running. This means that each time reconfigure is called
     * we'll bounce the server even if there are no configuration changes. This makes our life
     * easier as the underlying configuration routine doesn't have to worry about freeing objects in
     * the connection structure (it's guaranteed to always start with a blank slate), and we don't
     * have to worry about races where a running server is reading configuration information that
     * we're updating, and it's not expected that reconfiguration will happen a lot.
     */
    if (conn->capacity_session != NULL)
        WT_RET(__wti_capacity_server_destroy(session));
    WT_RET(__capacity_config(session, cfg));

    /*
     * If it is a read only connection or if background fsync is not supported, then there is
     * nothing to do.
     */
    if (F_ISSET(conn, WT_CONN_IN_MEMORY | WT_CONN_READONLY) || !__wt_fsync_background_chk(session))
        return (0);

    if (conn->capacity.total != 0)
        WT_RET(__capacity_server_start(conn));

    return (0);
}

/*
 * __wti_capacity_server_destroy --
 *     Destroy the capacity server thread.
 */
int
__wti_capacity_server_destroy(WT_SESSION_IMPL *session)
{
    WT_CONNECTION_IMPL *conn;
    WT_DECL_RET;

    conn = S2C(session);

    FLD_CLR(conn->server_flags, WT_CONN_SERVER_CAPACITY);
    if (conn->capacity_tid_set) {
        __wt_cond_signal(session, conn->capacity_cond);
        WT_TRET(__wt_thread_join(session, &conn->capacity_tid));
        conn->capacity_tid_set = false;
    }
    __wt_cond_destroy(session, &conn->capacity_cond);

    /* Close the server thread's session. */
    if (conn->capacity_session != NULL)
        WT_TRET(__wt_session_close_internal(conn->capacity_session));

    /*
     * Ensure capacity settings are cleared - so that reconfigure doesn't get confused.
     */
    conn->capacity_session = NULL;
    conn->capacity_tid_set = false;
    conn->capacity_cond = NULL;

    return (ret);
}

/*
 * __capacity_signal --
 *     Signal the capacity thread if sufficient data has been written.
 */
static void
__capacity_signal(WT_SESSION_IMPL *session)
{
    WT_CAPACITY *cap;
    WT_CONNECTION_IMPL *conn;

    conn = S2C(session);
    cap = &conn->capacity;
    if (cap->written >= cap->threshold && !cap->signalled) {
        __wt_cond_signal(session, conn->capacity_cond);
        cap->signalled = true;
    }
}

/*
 * __capacity_reserve --
 *     Make a reservation for the given number of bytes against the capacity of the subsystem.
 */
static void
__capacity_reserve(
  uint64_t *reservation, uint64_t bytes, uint64_t capacity, uint64_t now_ns, uint64_t *result)
{
    uint64_t res_len, res_value;

    if (capacity != 0) {
        res_len = WT_RESERVATION_NS(bytes, capacity);
        res_value = __wt_atomic_add64(reservation, res_len);
        if (now_ns > res_value && now_ns - res_value > WT_BILLION)
            /*
             * If the reservation clock is out of date, bring it to within a second of a current
             * time.
             */
            *reservation = (now_ns - WT_BILLION) + res_len;
    } else
        res_value = now_ns;

    *result = res_value;
}

/*
 * __throttle_chunkcache --
 *     Reserve a time to perform a chunk cache read or write, and wait until then. The chunk cache
 *     is the only subsystem with a separate IO throttle; ideally future subsystem-specific
 *     throttles could be combined into this implementation.
 */
static void
__throttle_chunkcache(WT_SESSION_IMPL *session, WT_CAPACITY *cap, uint64_t bytes)
{
    struct timespec now;
    uint64_t capacity, now_ns, *reservation, res_value, sleep_us;

    capacity = cap->chunkcache;
    reservation = &cap->reservation_chunkcache;

    WT_STAT_CONN_INCRV(session, capacity_bytes_chunkcache, bytes);
    WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes);

    if (capacity == 0 || F_ISSET(S2C(session), WT_CONN_RECOVERING))
        return;

    __capacity_signal(session);

    /* If we get sizes larger than this, later calculations may overflow. */
    WT_ASSERT(session, bytes < 16 * (uint64_t)WT_GIGABYTE);
    WT_ASSERT(session, capacity != 0);

    /* Get the current time in nanoseconds since the epoch. */
    __wt_epoch(session, &now);
    now_ns = (uint64_t)now.tv_sec * WT_BILLION + (uint64_t)now.tv_nsec;

    /* Take a reservation for the subsystem. */
    __capacity_reserve(reservation, bytes, capacity, now_ns, &res_value);

    if (res_value > now_ns) {
        sleep_us = (res_value - now_ns) / WT_THOUSAND;
        WT_STAT_CONN_INCRV(session, capacity_time_chunkcache, sleep_us);
        if (sleep_us > WT_CAPACITY_SLEEP_CUTOFF_US) {
            /* Sleep handles large usec values. */
            __wt_sleep(0, sleep_us);
        }
    }
}

/*
 * __wt_capacity_throttle --
 *     Reserve a time to perform a write operation for the subsystem, and wait until that time. The
 *     concept is that each write to a subsystem reserves a time slot to do its write, and
 *     atomically adjusts the reservation marker to point past the reserved slot. The size of the
 *     adjustment (i.e. the length of time represented by the slot in nanoseconds) is chosen to be
 *     proportional to the number of bytes to be written, and the proportion is a simple calculation
 *     so that we can fit reservations for exactly the configured capacity in a second. Reservation
 *     times are in nanoseconds since the epoch.
 */
void
__wt_capacity_throttle(WT_SESSION_IMPL *session, uint64_t bytes, WT_THROTTLE_TYPE type)
{
    struct timespec now;
    WT_CAPACITY *cap;
    WT_CONNECTION_IMPL *conn;
    uint64_t best_res, capacity, new_res, now_ns, sleep_us, res_total_value;
    uint64_t res_value, steal_capacity, stolen_bytes, this_res;
    uint64_t *reservation, *steal;
    uint64_t total_capacity;

    conn = S2C(session);
    cap = &conn->capacity;
    steal_capacity = 0;
    reservation = steal = NULL;

    /* Quiet warnings from both gcc and clang about this variable. */
    WT_NOT_READ(capacity, 0);

    switch (type) {
    case WT_THROTTLE_CHUNKCACHE:
        __throttle_chunkcache(session, cap, bytes);
        return;
    case WT_THROTTLE_CKPT:
        capacity = cap->ckpt;
        reservation = &cap->reservation_ckpt;
        WT_STAT_CONN_INCRV(session, capacity_bytes_ckpt, bytes);
        WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes);
        break;
    case WT_THROTTLE_EVICT:
        capacity = cap->evict;
        reservation = &cap->reservation_evict;
        WT_STAT_CONN_INCRV(session, capacity_bytes_evict, bytes);
        WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes);
        break;
    case WT_THROTTLE_LOG:
        capacity = cap->log;
        reservation = &cap->reservation_log;
        WT_STAT_CONN_INCRV(session, capacity_bytes_log, bytes);
        WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes);
        break;
    case WT_THROTTLE_READ:
        capacity = cap->read;
        reservation = &cap->reservation_read;
        WT_STAT_CONN_INCRV(session, capacity_bytes_read, bytes);
        break;
    }
    total_capacity = __wt_atomic_load64(&cap->total);

    /*
     * Right now no subsystem can be individually turned off, but it is certainly a possibility to
     * consider one subsystem may be turned off at some point in the future. If this subsystem is
     * not throttled there's nothing to do.
     */
    if (__wt_atomic_load64(&cap->total) == 0 || capacity == 0 || F_ISSET(conn, WT_CONN_RECOVERING))
        return;

    /*
     * There may in fact be some reads done under the umbrella of log I/O, but they are mostly done
     * under recovery. And if we are recovering, we don't reach this code.
     */
    if (type != WT_THROTTLE_READ) {
        (void)__wt_atomic_addv64(&cap->written, bytes);
        __capacity_signal(session);
    }

    /* If we get sizes larger than this, later calculations may overflow. */
    WT_ASSERT(session, bytes < 16 * (uint64_t)WT_GIGABYTE);
    WT_ASSERT(session, capacity != 0);

    /* Get the current time in nanoseconds since the epoch. */
    __wt_epoch(session, &now);
    now_ns = (uint64_t)now.tv_sec * WT_BILLION + (uint64_t)now.tv_nsec;

again:
    /* Take a reservation for the subsystem, and for the total */
    __capacity_reserve(reservation, bytes, capacity, now_ns, &res_value);
    __capacity_reserve(&cap->reservation_total, bytes, total_capacity, now_ns, &res_total_value);

    /*
     * If we ended up with a future reservation, and we aren't constricted by the total capacity,
     * then we may be able to reallocate some unused reservation time from another subsystem.
     */
    if (res_value > now_ns && res_total_value < now_ns && steal == NULL && total_capacity != 0) {
        best_res = now_ns - WT_BILLION / 2;
        if (type != WT_THROTTLE_CKPT && (this_res = cap->reservation_ckpt) < best_res) {
            steal = &cap->reservation_ckpt;
            steal_capacity = cap->ckpt;
            best_res = this_res;
        }
        if (type != WT_THROTTLE_EVICT && (this_res = cap->reservation_evict) < best_res) {
            steal = &cap->reservation_evict;
            steal_capacity = cap->evict;
            best_res = this_res;
        }
        if (type != WT_THROTTLE_LOG && (this_res = cap->reservation_log) < best_res) {
            steal = &cap->reservation_log;
            steal_capacity = cap->log;
            best_res = this_res;
        }
        if (type != WT_THROTTLE_READ && (this_res = cap->reservation_read) < best_res) {
            steal = &cap->reservation_read;
            steal_capacity = cap->read;
            best_res = this_res;
        }

        if (steal != NULL) {
            /*
             * We have a subsystem that has enough spare capacity to steal. We'll take a small slice
             * (a fraction of a second worth) and add it to our own subsystem.
             */
            if (best_res < now_ns - WT_BILLION && now_ns > WT_BILLION)
                new_res = now_ns - WT_BILLION;
            else
                new_res = best_res;
            WT_ASSERT(session, steal_capacity != 0);
            new_res += WT_STEAL_FRACTION(WT_BILLION) + WT_RESERVATION_NS(bytes, steal_capacity);
            if (!__wt_atomic_casv64(steal, best_res, new_res)) {
                /*
                 * Give up our reservations and try again. We won't try to steal the next time.
                 */
                (void)__wt_atomic_sub64(reservation, WT_RESERVATION_NS(bytes, capacity));
                (void)__wt_atomic_sub64(
                  &cap->reservation_total, WT_RESERVATION_NS(bytes, total_capacity));
                goto again;
            }

            /*
             * We've stolen a fraction of a second of capacity. Figure out how many bytes that is,
             * before adding that many bytes to the acquiring subsystem's capacity.
             */
            stolen_bytes = WT_STEAL_FRACTION(steal_capacity);
            res_value = __wt_atomic_sub64(reservation, WT_RESERVATION_NS(stolen_bytes, capacity));
        }
    }
    if (res_value < res_total_value)
        res_value = res_total_value;

    if (res_value > now_ns) {
        sleep_us = (res_value - now_ns) / WT_THOUSAND;
        if (res_value == res_total_value)
            WT_STAT_CONN_INCRV(session, capacity_time_total, sleep_us);
        else
            switch (type) {
            case WT_THROTTLE_CHUNKCACHE:
                /*
                 * This section is not expected to be reached as we should have already returned
                 * earlier in case of chunk cache usage throttling.
                 */
                WT_ASSERT(session, false);
                break;
            case WT_THROTTLE_CKPT:
                WT_STAT_CONN_INCRV(session, capacity_time_ckpt, sleep_us);
                break;
            case WT_THROTTLE_EVICT:
                WT_STAT_CONN_INCRV(session, capacity_time_evict, sleep_us);
                break;
            case WT_THROTTLE_LOG:
                WT_STAT_CONN_INCRV(session, capacity_time_log, sleep_us);
                break;
            case WT_THROTTLE_READ:
                WT_STAT_CONN_INCRV(session, capacity_time_read, sleep_us);
                break;
            }
        if (sleep_us > WT_CAPACITY_SLEEP_CUTOFF_US)
            /* Sleep handles large usec values. */
            __wt_sleep(0, sleep_us);
    }
}
