/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
 *     Copyright 2012-2020 Couchbase, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */
#include "config.h"
#include "iotests.h"
#include "internal.h"
#include <map>
#include <include/libcouchbase/utils.h>

using namespace std;

#define LOGARGS(instance, lvl) instance->settings, "tests-dur", LCB_LOG_##lvl, __FILE__, __LINE__
#define SECS_USECS(f) ((f)*1000000)

static bool supportsMutationTokens(lcb_INSTANCE *instance)
{
    // Ensure we have at least one connection
    storeKey(instance, "dummy_stok_test", "dummy");

    int val = 0;
    lcb_STATUS rc;
    rc = lcb_cntl(instance, LCB_CNTL_GET, LCB_CNTL_MUTATION_TOKENS_SUPPORTED, &val);

    EXPECT_EQ(LCB_SUCCESS, rc);
    if (val == 0) {
        printf("Current cluster does not support synctokens!\n");
        return false;
    } else {
        return true;
    }
}

class DurabilityUnitTest : public MockUnitTest
{
  protected:
    static void defaultOptions(lcb_INSTANCE *instance, lcb_durability_opts_st &opts)
    {
        lcb_size_t nservers = lcb_get_num_nodes(instance);
        lcb_size_t nreplicas = lcb_get_num_replicas(instance);

        opts.v.v0.persist_to = (lcb_uint16_t)min(nreplicas + 1, nservers);
        opts.v.v0.replicate_to = (lcb_uint16_t)min(nreplicas, nservers - 1);
    }
};

extern "C" {
static void defaultDurabilityCallback(lcb_INSTANCE *, int, const lcb_RESPENDURE *);
static void multiDurabilityCallback(lcb_INSTANCE *, int, const lcb_RESPENDURE *);
}

class DurabilityOperation
{
  public:
    DurabilityOperation() {}

    string key;
    lcb_RESPENDURE resp;
    lcb_CMDENDURE req;

    void assign(const lcb_RESPENDURE *resp)
    {
        this->resp = *resp;
        key.assign((const char *)resp->ctx.key, resp->ctx.key_len);
        this->resp.ctx.key = NULL;
        this->resp.ctx.key_len = 0;
    }

    void wait(lcb_INSTANCE *instance)
    {
        lcb_install_callback(instance, LCB_CALLBACK_ENDURE, (lcb_RESPCALLBACK)defaultDurabilityCallback);
        EXPECT_EQ(LCB_SUCCESS, lcb_wait(instance, LCB_WAIT_DEFAULT));
    }

    void wait(lcb_INSTANCE *instance, const lcb_durability_opts_t *opts, const lcb_CMDENDURE *cmd, lcb_STATUS expected = LCB_SUCCESS)
    {

        lcb_STATUS rc;
        lcb_MULTICMD_CTX *mctx = lcb_endure3_ctxnew(instance, opts, &rc);
        EXPECT_FALSE(mctx == NULL);
        rc = mctx->addcmd(mctx, (lcb_CMDBASE *)cmd);
        EXPECT_EQ(expected, rc);
        if (rc != LCB_SUCCESS) {
            mctx->fail(mctx);
        } else {
            rc = mctx->done(mctx, this);
            EXPECT_EQ(LCB_SUCCESS, rc);
            wait(instance);
        }
    }

    void run(lcb_INSTANCE *instance, const lcb_durability_opts_t *opts, const Item &itm, lcb_STATUS expected = LCB_SUCCESS)
    {
        lcb_CMDENDURE cmd = {0};
        ASSERT_FALSE(itm.key.empty());
        LCB_CMD_SET_KEY(&cmd, itm.key.data(), itm.key.length());
        cmd.cas = itm.cas;
        wait(instance, opts, &cmd, expected);
    }

    // Really wait(), but named as 'run()' here to make usage more consistent.
    void run(lcb_INSTANCE *instance, const lcb_durability_opts_t *opts, const lcb_CMDENDURE &cmd)
    {
        wait(instance, opts, &cmd);
    }

    void assertCriteriaMatch(const lcb_durability_opts_st &opts)
    {
        ASSERT_EQ(LCB_SUCCESS, resp.ctx.rc);
        ASSERT_TRUE(resp.persisted_master != 0);
        ASSERT_TRUE(opts.v.v0.persist_to <= resp.npersisted);
        ASSERT_TRUE(opts.v.v0.replicate_to <= resp.nreplicated);
    }

    void dump(std::string &str)
    {
        if (key.empty()) {
            str = "<No Key>\n";
            return;
        }
        std::stringstream ss;
        ss << "Key: " << key << std::endl
           << "Error: " << resp.ctx.rc << std::endl
           << "Persisted (master?): " << resp.npersisted << " (" << resp.persisted_master << ")" << std::endl
           << "Replicated: " << resp.nreplicated << std::endl
           << "CAS: 0x" << std::hex << resp.ctx.cas << std::endl;
        str += ss.str();
    }

    void dump()
    {
        string s;
        dump(s);
        cout << s;
    }
};

class DurabilityMultiOperation
{
  public:
    DurabilityMultiOperation() : counter(0) {}

    template < typename T > void run(lcb_INSTANCE *instance, const lcb_durability_opts_t *opts, const T &items)
    {
        counter = 0;
        unsigned ii = 0;
        typename T::const_iterator iter = items.begin();
        lcb_STATUS rc;
        lcb_MULTICMD_CTX *mctx = lcb_endure3_ctxnew(instance, opts, &rc);
        ASSERT_FALSE(mctx == NULL);

        for (; iter != items.end(); iter++, ii++) {
            lcb_CMDENDURE cmd = {0};
            const Item &itm = *iter;

            cmd.cas = itm.cas;
            LCB_CMD_SET_KEY(&cmd, itm.key.c_str(), itm.key.length());
            rc = mctx->addcmd(mctx, (lcb_CMDBASE *)&cmd);
            ASSERT_EQ(LCB_SUCCESS, rc);
            kmap[itm.key] = DurabilityOperation();
        }

        lcb_install_callback(instance, LCB_CALLBACK_ENDURE, (lcb_RESPCALLBACK)multiDurabilityCallback);

        rc = mctx->done(mctx, this);
        ASSERT_EQ(LCB_SUCCESS, rc);
        lcb_wait(instance, LCB_WAIT_DEFAULT);
        ASSERT_EQ(items.size(), counter);
    }

    void assign(const lcb_RESPENDURE *resp)
    {
        ASSERT_GT(resp->ctx.key_len, 0U);
        counter++;

        string key;
        key.assign((const char *)resp->ctx.key, resp->ctx.key_len);
        ASSERT_TRUE(kmap.find(key) != kmap.end());
        kmap[key].assign(resp);
    }

    template < typename T > bool _findItem(const string &s, const T &items, Item &itm)
    {
        for (typename T::const_iterator iter = items.begin(); iter != items.end(); iter++) {
            if (iter->key.compare(s) == 0) {
                itm = *iter;
                return true;
            }
        }
        return false;
    }

    template < typename T >
    void assertAllMatch(const lcb_durability_opts_t &opts, const T &items_ok, const T &items_missing,
                        lcb_STATUS missing_err = LCB_ERR_DOCUMENT_NOT_FOUND)
    {

        for (map< string, DurabilityOperation >::iterator iter = kmap.begin(); iter != kmap.end(); iter++) {

            Item itm_tmp;
            // make sure we were expecting it
            if (_findItem(iter->second.key, items_ok, itm_tmp)) {
                iter->second.assertCriteriaMatch(opts);

            } else if (_findItem(iter->second.key, items_missing, itm_tmp)) {
                ASSERT_EQ(missing_err, iter->second.resp.ctx.rc);

            } else {
                ASSERT_STREQ("", "Key not in missing or OK list");
            }
        }

        // Finally, make sure they're all there

        for (typename T::const_iterator iter = items_ok.begin(); iter != items_ok.end(); iter++) {
            ASSERT_TRUE(kmap.find(iter->key) != kmap.end());
        }

        for (typename T::const_iterator iter = items_missing.begin(); iter != items_missing.end(); iter++) {
            ASSERT_TRUE(kmap.find(iter->key) != kmap.end());
        }
    }

    unsigned counter;
    map< string, DurabilityOperation > kmap;
};

extern "C" {
static void defaultDurabilityCallback(lcb_INSTANCE *, int, const lcb_RESPENDURE *res)
{
    ((DurabilityOperation *)res->cookie)->assign(res);
}
static void multiDurabilityCallback(lcb_INSTANCE *, int, const lcb_RESPENDURE *res)
{
    ((DurabilityMultiOperation *)res->cookie)->assign(res);
}
}

TEST_F(DurabilityUnitTest, testInvalidCriteria)
{
    /**
     * We don't schedule stuff to the network here
     */
    HandleWrap hwrap;
    createConnection(hwrap);

    lcb_INSTANCE *instance = hwrap.getLcb();

    lcb_durability_opts_t opts = {0};
    defaultOptions(instance, opts);
    opts.v.v0.persist_to = 10;
    opts.v.v0.replicate_to = 100;
    opts.v.v0.cap_max = 0;

    lcb_MULTICMD_CTX *mctx;
    lcb_STATUS err = LCB_SUCCESS;
    mctx = lcb_endure3_ctxnew(instance, &opts, &err);
    ASSERT_EQ(err, LCB_ERR_DURABILITY_TOO_MANY);
    ASSERT_EQ((lcb_MULTICMD_CTX *)NULL, mctx);
}

/**
 * Test various criteria for durability
 */
TEST_F(DurabilityUnitTest, testDurabilityCriteria)
{
    HandleWrap hwrap;
    lcb_INSTANCE *instance;

    createConnection(hwrap);
    instance = hwrap.getLcb();

    lcb_durability_opts_st opts = {0};

    /** test with no persist/replicate */
    defaultOptions(instance, opts);

    opts.v.v0.replicate_to = 0;
    opts.v.v0.persist_to = 0;

    lcb_MULTICMD_CTX *mctx;
    lcb_STATUS err = LCB_SUCCESS;
    mctx = lcb_endure3_ctxnew(instance, &opts, &err);
    ASSERT_EQ(err, LCB_ERR_INVALID_ARGUMENT);
    ASSERT_EQ((lcb_MULTICMD_CTX *)NULL, mctx);
}

/**
 * @test Test several 'basic' durability functions
 *
 * @pre Store a key. Perform a durability check with master-only persistence
 * (i.e. persist_to = 1, replicate_to = 0)
 * @post Operation succeeds
 *
 * @pre Check the key against 'maximum possible' durability by estimating the
 * maximum replica/server count
 *
 * @post Operation succeeds
 *
 * @pre Set the durability options to a very large criteria, but set the
 * @c cap_max flag so the API will reduce it to a sane default. Then use it
 * for a durability check
 *
 * @post the response is successful
 */
TEST_F(DurabilityUnitTest, testSimpleDurability)
{
    /** need real cluster for durability tests */
    LCB_TEST_REQUIRE_FEATURE("observe");
    SKIP_UNLESS_MOCK();

    HandleWrap hwrap;
    lcb_INSTANCE *instance;

    Item kv = Item("a_key", "a_value", 0);
    createConnection(hwrap);
    instance = hwrap.getLcb();

    removeKey(instance, kv.key);

    KVOperation kvo = KVOperation(&kv);
    kvo.store(instance);

    // Now wait for it to persist
    lcb_durability_opts_t opts;
    memset(&opts, 0, sizeof(opts));
    opts.v.v0.pollopts = LCB_DURABILITY_MODE_SEQNO;
    opts.v.v0.persist_to = 1;
    opts.v.v0.replicate_to = 0;

    kvo = KVOperation(&kv);
    kvo.get(instance);

    DurabilityOperation dop;
    dop.run(instance, &opts, kvo.result);

    dop.assertCriteriaMatch(opts);
    ASSERT_STREQ(kv.key.c_str(), dop.key.c_str());

    // Try with more expanded criteria
    defaultOptions(instance, opts);
    dop = DurabilityOperation();
    dop.run(instance, &opts, kvo.result);
    dop.assertCriteriaMatch(opts);

    // Make the options to some absurd number. Ensure it's capped!
    opts.v.v0.persist_to = 100;
    opts.v.v0.replicate_to = 100;
    opts.v.v0.cap_max = 1;

    dop = DurabilityOperation();
    dop.run(instance, &opts, kvo.result);
    defaultOptions(instance, opts);
    dop.assertCriteriaMatch(opts);
}

/**
 * @test Durability checks against non-existent keys
 * @pre Remove a key, and perform a durability check against it
 * @post Operation fails with @c LCB_ERR_DOCUMENT_NOT_FOUND
 */
TEST_F(DurabilityUnitTest, testNonExist)
{
    LCB_TEST_REQUIRE_FEATURE("observe");
    SKIP_UNLESS_MOCK();

    lcb_INSTANCE *instance;
    HandleWrap hwrap;

    string key = "non-exist-key";

    createConnection(hwrap);
    instance = hwrap.getLcb();

    removeKey(instance, key);

    Item itm = Item(key, "", 0);

    DurabilityOperation dop;
    lcb_durability_opts_t opts = {0};
    opts.v.v0.timeout = SECS_USECS(2);

    defaultOptions(instance, opts);

    opts.version = 1;
    opts.v.v0.pollopts = LCB_DURABILITY_MODE_SEQNO;

    dop.run(instance, &opts, itm, LCB_ERR_DURABILITY_NO_MUTATION_TOKENS);
}

/**
 * @test Test negative durability (Delete)
 *
 * @pre Store a key, Remove it, perform a durability check against the key,
 * using the @c check_delete flag
 *
 * @post A positive reply is received indicating the item has been deleted
 *
 * @pre Store the key, but don't remove it. Perform a durability check against
 * the key using the delete flag
 *
 * @post Operation is returned with @c LCB_ERR_TIMEOUT
 */
TEST_F(DurabilityUnitTest, testDelete)
{
    LCB_TEST_REQUIRE_FEATURE("observe");
    SKIP_UNLESS_MOCK();

    HandleWrap hwrap;
    lcb_INSTANCE *instance;
    lcb_durability_opts_t opts = {0};
    string key = "deleted-key";
    createConnection(hwrap);
    instance = hwrap.getLcb();

    storeKey(instance, key, "value");

    Item itm = Item(key, "value", 0);
    KVOperation kvo = KVOperation(&itm);
    kvo.remove(instance);

    // Ensure the key is actually purged!
    MockMutationCommand mcmd(MockCommand::PURGE, key);
    mcmd.onMaster = true;
    mcmd.replicaCount = lcb_get_num_replicas(instance);
    doMockTxn(mcmd);

    defaultOptions(instance, opts);
    opts.v.v0.check_delete = 1;
    DurabilityOperation dop;
    dop.run(instance, &opts, itm);
    dop.assertCriteriaMatch(opts);

    kvo.clear();
    kvo.request = &itm;
    kvo.store(instance);

    opts.v.v0.timeout = SECS_USECS(1);

    opts.version = 1;
    opts.v.v0.pollopts = LCB_DURABILITY_MODE_SEQNO;
    dop = DurabilityOperation();
    dop.run(instance, &opts, itm);
    ASSERT_EQ(LCB_SUCCESS, dop.resp.ctx.rc);
}

/**
 * @test Test behavior when a key is modified (exists with a different CAS)
 *
 * @pre Store a key. Store it again. Keep the CAS from the first store as the
 * stale CAS. Keep the current CAS as well.
 *
 * @pre Perform a durability check against the stale CAS
 * @post Operation fails with @c LCB_ERR_DOCUMENT_EXISTS
 *
 * @pre Perform a durability check against the new CAS
 * @post Operation succeeds
 */
TEST_F(DurabilityUnitTest, testModified)
{
    LCB_TEST_REQUIRE_FEATURE("observe");

    HandleWrap hwrap;
    lcb_INSTANCE *instance;
    lcb_durability_opts_t opts = {0};
    string key = "mutated-key";
    Item itm = Item(key, key);
    KVOperation kvo_cur(&itm), kvo_stale(&itm);

    createConnection(hwrap);
    instance = hwrap.getLcb();

    kvo_stale.store(instance);
    kvo_cur.store(instance);

    kvo_stale.result.val = kvo_cur.result.val = key;

    defaultOptions(instance, opts);

    opts.version = 1;
    opts.v.v0.pollopts = LCB_DURABILITY_MODE_SEQNO;
    DurabilityOperation dop;
    dop.run(instance, &opts, kvo_stale.result);
    ASSERT_EQ(LCB_SUCCESS, dop.resp.ctx.rc);
}

/**
 * @test Test with very quick timeouts
 * @pre Schedule an operation with an interval of 2 usec and a timeout of
 * 5 usec
 *
 * @post Operation returns with LCB_ERR_TIMEOUT
 */
TEST_F(DurabilityUnitTest, testQuickTimeout)
{
    LCB_TEST_REQUIRE_FEATURE("observe");
    lcb_INSTANCE *instance;
    HandleWrap hwrap;
    lcb_durability_opts_t opts = {0};
    string key = "a_key";

    createConnection(hwrap);
    instance = hwrap.getLcb();

    Item itm = Item(key, key);
    KVOperation(&itm).store(instance);

    defaultOptions(instance, opts);

    /* absurd */
    opts.v.v0.timeout = 5;
    opts.v.v0.interval = 2;

    for (unsigned ii = 0; ii < 10; ii++) {
        DurabilityOperation dop;
        dop.run(instance, &opts, itm);
        ASSERT_EQ(LCB_ERR_TIMEOUT, dop.resp.ctx.rc);
    }
}

/**
 * @test Test a durability request for multiple keys
 *
 * @pre Store ten keys, and check that they exist all at once
 * @post all ten keys are received in the response, and they're ok
 */
TEST_F(DurabilityUnitTest, testMulti)
{
    LCB_TEST_REQUIRE_FEATURE("observe");
    unsigned ii;
    const unsigned limit = 10;

    vector< Item > items_stored;

    HandleWrap hwrap;
    lcb_INSTANCE *instance;

    createConnection(hwrap);
    instance = hwrap.getLcb();
    // Set the timeout to something high. For some reason this gives problem
    // on a real cluster
    lcb_cntl_setu32(instance, LCB_CNTL_DURABILITY_TIMEOUT, LCB_MS2US(10000));

    for (ii = 0; ii < limit; ii++) {
        char buf[64];
        sprintf(buf, "key-stored-%u", ii);
        string key_stored = buf;

        removeKey(instance, key_stored);

        Item itm_e = Item(key_stored, key_stored, 0);

        KVOperation kvo(&itm_e);
        kvo.store(instance);
        items_stored.push_back(kvo.result);
    }

    lcb_durability_opts_t opts = {0};
    defaultOptions(instance, opts);
    opts.version = 1;
    opts.v.v0.pollopts = LCB_DURABILITY_MODE_SEQNO;

    /**
     * Create the command..
     */
    DurabilityMultiOperation dmop = DurabilityMultiOperation();
    dmop.run(instance, &opts, items_stored);
    dmop.assertAllMatch(opts, items_stored, vector< Item >());
}

struct cb_cookie {
    int is_observe;
    int count;
};

extern "C" {
static void dummyObserveCallback(lcb_INSTANCE *, lcb_CALLBACK_TYPE, const lcb_RESPOBSERVE *resp)
{
    struct cb_cookie *c = (struct cb_cookie *)resp->cookie;
    ASSERT_EQ(1, c->is_observe);
    c->count++;
}

static void dummyDurabilityCallback(lcb_INSTANCE *, lcb_CALLBACK_TYPE, const lcb_RESPENDURE *resp)
{
    struct cb_cookie *c = (struct cb_cookie *)resp->cookie;
    ASSERT_EQ(0, c->is_observe);
    c->count++;
}
}

/**
 * @test Ensure basic observe functions as normal
 *
 * @pre pair up two batched commands, one a durability command, and one a
 * primitive observe. Set up distinct callbacks for the two (both of which
 * touch a counter, one incrementing and one decrementing an int*).
 * Wait for the operations to complete via @c lcb_wait
 *
 * @post The durability counter is decremented, observe counter incremented
 */
TEST_F(DurabilityUnitTest, testObserveSanity)
{
    LCB_TEST_REQUIRE_FEATURE("observe");
    HandleWrap handle;
    lcb_INSTANCE *instance;
    createConnection(handle);
    instance = handle.getLcb();
    lcb_STATUS err;

    lcb_install_callback(instance, LCB_CALLBACK_ENDURE, (lcb_RESPCALLBACK)dummyDurabilityCallback);
    lcb_install_callback(instance, LCB_CALLBACK_OBSERVE, (lcb_RESPCALLBACK)dummyObserveCallback);

    storeKey(instance, "key", "value");

    struct cb_cookie o_cookie = {1, 0};
    {
        lcb_MULTICMD_CTX *mctx = lcb_observe3_ctxnew(instance);
        ASSERT_NE((lcb_MULTICMD_CTX *)NULL, mctx);
        lcb_CMDOBSERVE cmd = {0};
        LCB_CMD_SET_KEY(&cmd, "key", 3);
        ASSERT_EQ(LCB_SUCCESS, mctx->addcmd(mctx, (lcb_CMDBASE *)&cmd));
        ASSERT_EQ(LCB_SUCCESS, mctx->done(mctx, &o_cookie));
    }

    struct cb_cookie d_cookie = {0, 0};
    {
        lcb_durability_opts_t opts = {0};
        defaultOptions(instance, opts);

        lcb_STATUS err = LCB_SUCCESS;
        lcb_MULTICMD_CTX *mctx = lcb_endure3_ctxnew(instance, &opts, &err);
        ASSERT_EQ(LCB_SUCCESS, err);
        ASSERT_NE((lcb_MULTICMD_CTX *)NULL, mctx);
        lcb_CMDENDURE cmd = {0};
        LCB_CMD_SET_KEY(&cmd, "key", 3);
        ASSERT_EQ(LCB_SUCCESS, mctx->addcmd(mctx, (lcb_CMDBASE *)&cmd));
        ASSERT_EQ(LCB_SUCCESS, mctx->done(mctx, &d_cookie));
    }

    ASSERT_EQ(LCB_SUCCESS, lcb_wait(instance, LCB_WAIT_DEFAULT));

    ASSERT_GT(o_cookie.count, 0);
    ASSERT_GT(d_cookie.count, 0);
}

TEST_F(DurabilityUnitTest, testMasterObserve)
{
    LCB_TEST_REQUIRE_FEATURE("observe");
    SKIP_UNLESS_MOCK();

    HandleWrap handle;
    createConnection(handle);
    lcb_INSTANCE *instance = handle.getLcb();

    lcb_install_callback(instance, LCB_CALLBACK_OBSERVE, (lcb_RESPCALLBACK)dummyObserveCallback);

    struct cb_cookie o_cookie = {1, 0};
    lcb_MULTICMD_CTX *mctx = lcb_observe3_ctxnew(instance);
    ASSERT_NE((lcb_MULTICMD_CTX *)NULL, mctx);
    lcb_CMDOBSERVE cmd = {0};
    cmd.cmdflags |= LCB_CMDOBSERVE_F_MASTER_ONLY;
    LCB_CMD_SET_KEY(&cmd, "key", 3);
    ASSERT_EQ(LCB_SUCCESS, mctx->addcmd(mctx, (lcb_CMDBASE *)&cmd));
    ASSERT_EQ(LCB_SUCCESS, mctx->done(mctx, &o_cookie));
    lcb_wait(instance, LCB_WAIT_DEFAULT);

    // 2 == one for the callback, one for the NULL
    ASSERT_EQ(2, o_cookie.count);
}

extern "C" {
static void fo_callback(void *cookie)
{
    lcb_INSTANCE *instance = (lcb_INSTANCE *)cookie;
    MockEnvironment *mock = MockEnvironment::getInstance();
    for (int ii = 1; ii < mock->getNumNodes(); ii++) {
        mock->failoverNode(ii);
    }
    lcb_loop_unref(instance);
}
}

/**
 * Test the functionality of durability operations during things like
 * node failovers.
 *
 * The idea behind here is to ensure that we can trigger a case where a series
 * of OBSERVE packets are caught in the middle of a cluster update and end up
 * being relocated to the same server. Previously (and currently) this would
 * confuse the lookup_server_with_command functionality which would then invoke
 * the 'NULL' callback multiple times (because it assumes it's not located
 * anywhere else)
 */
TEST_F(DurabilityUnitTest, testDurabilityRelocation)
{
    SKIP_UNLESS_MOCK();

    // Disable CCCP so that we get streaming updates
    MockEnvironment *mock = MockEnvironment::getInstance();
    mock->setCCCP(false);

    HandleWrap handle;
    lcb_INSTANCE *instance;
    createConnection(handle);
    instance = handle.getLcb();

    lcb_install_callback(instance, LCB_CALLBACK_ENDURE, (lcb_RESPCALLBACK)dummyDurabilityCallback);

    std::string key = "key";
    lcb_durability_opts_t opts = {0};
    opts.v.v0.persist_to = 100;
    opts.v.v0.replicate_to = 100;
    opts.v.v0.cap_max = 1;
    storeKey(instance, key, "value");

    // Ensure we have to resend commands multiple times
    MockMutationCommand mcmd(MockCommand::UNPERSIST, key);
    mcmd.onMaster = true;
    mcmd.replicaCount = lcb_get_num_replicas(instance);
    doMockTxn(mcmd);

    /**
     * Failover all but one node
     */
    for (int ii = 1; ii < mock->getNumNodes(); ii++) {
        mock->hiccupNodes(1000, 0);
    }
    lcbio_pTIMER tm = lcbio_timer_new(handle.getLcb()->iotable, instance, fo_callback);
    lcbio_timer_rearm(tm, 500000);
    lcb_loop_ref(instance);

    lcb_STATUS err = LCB_SUCCESS;
    lcb_MULTICMD_CTX *mctx = lcb_endure3_ctxnew(instance, &opts, &err);
    ASSERT_EQ(LCB_SUCCESS, err);
    ASSERT_NE((lcb_MULTICMD_CTX *)NULL, mctx);
    lcb_CMDENDURE cmd = {0};
    LCB_CMD_SET_KEY(&cmd, key.c_str(), key.size());
    err = mctx->addcmd(mctx, (lcb_CMDBASE *)&cmd);
    ASSERT_EQ(LCB_SUCCESS, err);

    struct cb_cookie cookie = {0, 0};
    ASSERT_EQ(LCB_SUCCESS, mctx->done(mctx, &cookie));

    lcb_wait(instance, LCB_WAIT_DEFAULT);
    lcbio_timer_destroy(tm);
    ASSERT_EQ(1, cookie.count);
}

TEST_F(DurabilityUnitTest, testMissingSynctoken)
{
    HandleWrap hw;
    lcb_INSTANCE *instance;
    createConnection(hw, &instance);

    if (!supportsMutationTokens(instance)) {
        return;
    }

    std::string("nonexist-key");
    lcb_STATUS rc;
    lcb_MULTICMD_CTX *mctx;
    lcb_durability_opts_t options = {0};
    defaultOptions(instance, options);
    options.version = 1;
    options.v.v0.pollopts = LCB_DURABILITY_MODE_SEQNO;

    mctx = lcb_endure3_ctxnew(instance, &options, &rc);
    ASSERT_FALSE(mctx == NULL);
    lcb_CMDENDURE cmd = {0};
    LCB_CMD_SET_KEY(&cmd, "foo", 3);

    rc = mctx->addcmd(mctx, (lcb_CMDBASE *)&cmd);
    ASSERT_EQ(LCB_ERR_DURABILITY_NO_MUTATION_TOKENS, rc);

    mctx->fail(mctx);
}

TEST_F(DurabilityUnitTest, testExternalSynctoken)
{
    HandleWrap hw1, hw2;
    lcb_INSTANCE *instance1, *instance2;
    createConnection(hw1, &instance1);
    createConnection(hw2, &instance2);

    if (!supportsMutationTokens(instance1)) {
        return;
    }

    std::string key("hello");
    std::string value("world");
    storeKey(instance1, key, value);

    const lcb_MUTATION_TOKEN *ss;
    lcb_KEYBUF kb;
    lcb_STATUS rc;
    LCB_KREQ_SIMPLE(&kb, key.c_str(), key.size());
    ss = lcb_get_mutation_token(instance1, &kb, &rc);
    ASSERT_FALSE(ss == NULL);
    ASSERT_TRUE(LCB_MUTATION_TOKEN_ISVALID(ss));
    ASSERT_EQ(LCB_SUCCESS, rc);

    lcb_durability_opts_t options = {0};
    lcb_CMDENDURE cmd = {0};
    defaultOptions(instance2, options);
    options.version = 1;
    options.v.v0.pollopts = LCB_DURABILITY_MODE_SEQNO;

    // Initialize the command
    LCB_CMD_SET_KEY(&cmd, key.c_str(), key.size());
    cmd.mutation_token = ss;
    cmd.cmdflags |= LCB_CMDENDURE_F_MUTATION_TOKEN;

    DurabilityOperation dop;
    dop.run(instance2, &options, cmd);
    // TODO: How to actually run this?
    ASSERT_EQ(LCB_SUCCESS, dop.resp.ctx.rc);
}

TEST_F(DurabilityUnitTest, testOptionValidation)
{
    HandleWrap hw;
    lcb_INSTANCE *instance;
    lcb_U16 persist = 0, replicate = 0;
    lcb_STATUS rc;

    createConnection(hw, &instance);

    // Validate simple mode
    persist = -1;
    replicate = -1;
    rc = lcb_durability_validate(instance, &persist, &replicate, LCB_DURABILITY_VALIDATE_CAPMAX);

    ASSERT_EQ(LCB_SUCCESS, rc);
    ASSERT_TRUE(persist > replicate);

    lcbvb_CONFIG *vbc;
    rc = lcb_cntl(instance, LCB_CNTL_GET, LCB_CNTL_VBCONFIG, &vbc);
    ASSERT_EQ(LCB_SUCCESS, rc);

    int replica_max = min(LCBVB_NREPLICAS(vbc), LCBVB_NDATASERVERS(vbc) - 1);
    int persist_max = replica_max + 1;

    ASSERT_EQ(replica_max, replicate);
    ASSERT_EQ(persist_max, persist);

    persist = 0;
    replicate = 0;
    rc = lcb_durability_validate(instance, &persist, &replicate, 0);
    ASSERT_EQ(LCB_ERR_INVALID_ARGUMENT, rc);

    persist = -1;
    replicate = -1;
    rc = lcb_durability_validate(instance, &persist, &replicate, 0);
    ASSERT_EQ(LCB_ERR_DURABILITY_TOO_MANY, rc);

    persist = persist_max;
    replicate = replica_max;
    rc = lcb_durability_validate(instance, &persist, &replicate, 0);
    ASSERT_EQ(LCB_SUCCESS, rc);
    ASSERT_EQ(persist_max, persist);
    ASSERT_EQ(replica_max, replicate);

    rc = lcb_durability_validate(instance, &persist, &replicate, LCB_DURABILITY_VALIDATE_CAPMAX);
    ASSERT_EQ(LCB_SUCCESS, rc);
    ASSERT_EQ(persist_max, persist);
    ASSERT_EQ(replica_max, replicate);
}

typedef struct {
    int store_ok;
    uint16_t npersisted;
    uint16_t nreplicated;
    lcb_STATUS rc;
} st_RESULT;

extern "C" {
static void durstoreCallback(lcb_INSTANCE *, int, const lcb_RESPSTORE *resp)
{
    st_RESULT *res;

    ASSERT_TRUE(lcb_respstore_observe_attached(resp));

    lcb_respstore_cookie(resp, (void **)&res);
    res->rc = lcb_respstore_status(resp);
    lcb_respstore_observe_stored(resp, &res->store_ok);
    lcb_respstore_observe_num_persisted(resp, &res->npersisted);
    lcb_respstore_observe_num_replicated(resp, &res->nreplicated);
}
}

TEST_F(DurabilityUnitTest, testDurStore)
{
    HandleWrap hw;
    lcb_INSTANCE *instance;
    lcb_durability_opts_t options = {0};
    createConnection(hw, &instance);
    lcb_install_callback(instance, LCB_CALLBACK_STORE, (lcb_RESPCALLBACK)durstoreCallback);

    std::string key("durStore");
    std::string value("value");

    lcb_STATUS rc;
    st_RESULT res = {0};

    lcb_CMDSTORE *cmd;
    lcb_cmdstore_create(&cmd, LCB_STORE_UPSERT);
    lcb_cmdstore_key(cmd, key.c_str(), key.size());
    lcb_cmdstore_value(cmd, value.c_str(), value.size());

    defaultOptions(instance, options);
    lcb_cmdstore_durability_observe(cmd, options.v.v0.persist_to, options.v.v0.replicate_to);
    lcb_sched_enter(instance);
    res.rc = LCB_ERR_GENERIC;
    rc = lcb_store(instance, &res, cmd);
    ASSERT_EQ(LCB_SUCCESS, rc);
    lcb_sched_leave(instance);
    lcb_wait(instance, LCB_WAIT_DEFAULT);
    lcb_cmdstore_destroy(cmd);

    ASSERT_EQ(LCB_SUCCESS, res.rc);
    ASSERT_NE(0, res.store_ok);
    ASSERT_TRUE(options.v.v0.persist_to <= res.npersisted);
    ASSERT_TRUE(options.v.v0.replicate_to <= res.nreplicated);

    lcb_cmdstore_create(&cmd, LCB_STORE_REPLACE);
    lcb_cmdstore_key(cmd, key.c_str(), key.size());
    lcb_cmdstore_value(cmd, value.c_str(), value.size());
    lcb_sched_enter(instance);
    // Try with bad criteria..
    lcb_cmdstore_durability_observe(cmd, 100, 100);
    rc = lcb_store(instance, &res, cmd);
    ASSERT_EQ(LCB_ERR_DURABILITY_TOO_MANY, rc);

    // Try with no persist/replicate options
    lcb_cmdstore_durability_observe(cmd, 0, 0);
    rc = lcb_store(instance, &res, cmd);
    ASSERT_EQ(LCB_ERR_INVALID_ARGUMENT, rc);
    lcb_sched_fail(instance);

    // CAP_MAX should be applied here
    lcb_cmdstore_durability_observe(cmd, -1, -1);
    lcb_sched_enter(instance);
    rc = lcb_store(instance, &res, cmd);
    ASSERT_EQ(LCB_SUCCESS, rc);
    lcb_sched_leave(instance);
    lcb_wait(instance, LCB_WAIT_DEFAULT);
    ASSERT_EQ(LCB_SUCCESS, res.rc);
    ASSERT_TRUE(options.v.v0.persist_to <= res.npersisted);
    ASSERT_TRUE(options.v.v0.replicate_to <= res.nreplicated);

    // Use bad CAS. we should have a clear indicator that storage failed
    lcb_cmdstore_cas(cmd, -1);
    lcb_sched_enter(instance);
    rc = lcb_store(instance, &res, cmd);
    ASSERT_EQ(LCB_SUCCESS, rc);
    lcb_sched_leave(instance);
    lcb_wait(instance, LCB_WAIT_DEFAULT);
    ASSERT_EQ(LCB_ERR_DOCUMENT_EXISTS, res.rc);
    ASSERT_EQ(0, res.store_ok);

    // Make storage succeed, but let durability fail.
    // TODO: Add Mock-specific command to disable persistence/replication
    lcb_U32 ustmo = 1; // 1 microsecond
    rc = lcb_cntl(instance, LCB_CNTL_SET, LCB_CNTL_DURABILITY_TIMEOUT, &ustmo);
    ASSERT_EQ(LCB_SUCCESS, rc);

    // Reset CAS from previous command
    lcb_cmdstore_cas(cmd, 0);
    lcb_sched_enter(instance);
    rc = lcb_store(instance, &res, cmd);
    ASSERT_EQ(LCB_SUCCESS, rc);
    lcb_sched_leave(instance);
    lcb_wait(instance, LCB_WAIT_DEFAULT);
    if (res.rc == LCB_ERR_TIMEOUT) {
        ASSERT_NE(0, res.store_ok);
    } else {
        lcb_log(LOGARGS(instance, WARN), "Test skipped because mock is too fast(!)");
    }
    lcb_cmdstore_destroy(cmd);
}

TEST_F(DurabilityUnitTest, testFailoverAndSeqno)
{
    SKIP_UNLESS_MOCK();

    // Disable CCCP so that we get streaming updates
    MockEnvironment *mock = MockEnvironment::getInstance();
    mock->setCCCP(false);

    HandleWrap hwrap;
    lcb_INSTANCE *instance;
    lcb_durability_opts_t opts = {0};
    string key = "key-failover-seqno";
    Item itm = Item(key, key);
    KVOperation kvo(&itm);

    createConnection(hwrap);
    instance = hwrap.getLcb();

    kvo.store(instance);

    defaultOptions(instance, opts);
    DurabilityOperation dop;

    /* make sure that seqno works on healthy cluster */
    opts.version = 1;
    opts.v.v0.pollopts = LCB_DURABILITY_MODE_SEQNO;
    dop = DurabilityOperation();
    dop.run(instance, &opts, kvo.result);
    ASSERT_EQ(LCB_SUCCESS, dop.resp.ctx.rc);

    /* failover all nodes but master */
    lcbvb_CONFIG *vbc;
    ASSERT_EQ(LCB_SUCCESS, lcb_cntl(instance, LCB_CNTL_GET, LCB_CNTL_VBCONFIG, &vbc));
    int vbid, srvix;
    lcbvb_map_key(vbc, key.c_str(), key.size(), &vbid, &srvix);
    for (size_t jj = 0; jj < lcbvb_get_nreplicas(vbc); jj++) {
        int rix = lcbvb_vbreplica(vbc, vbid, jj);
        mock->failoverNode(rix, "default", false);
    }

    /* make sure that client gets new configration */
    instance->bs_state->reset_last_refresh();
    instance->confmon->stop();
    instance->bootstrap(lcb::BS_REFRESH_ALWAYS);

    dop = DurabilityOperation();
    dop.run(instance, &opts, kvo.result);
    ASSERT_EQ(LCB_ERR_DURABILITY_TOO_MANY, dop.resp.ctx.rc);
}
