/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
 *     Copyright 2014-2020 Couchbase, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */

#include "mctest.h"
#include "mc/mcreq-flush-inl.h"

class McFlush : public ::testing::Test
{
};

struct MyCookie {
    int ncalled;
    void *exp_kbuf;
    MyCookie() : ncalled(0), exp_kbuf(NULL) {}
};

extern "C" {
static void buf_free_callback(mc_PIPELINE *, const void *cookie, void *kbuf, void *vbuf)
{
    MyCookie *ck = (MyCookie *)cookie;
    EXPECT_TRUE(kbuf == ck->exp_kbuf);
    ck->ncalled++;
}
}

TEST_F(McFlush, testBasicFlush)
{
    CQWrap cq;
    PacketWrap pw;

    cq.setBufFreeCallback(buf_free_callback);
    pw.setContigKey("1234");
    ASSERT_TRUE(pw.reservePacket(&cq));

    MyCookie cookie;
    cookie.exp_kbuf = pw.pktbuf;

    pw.setCookie(&cookie);
    pw.setHeaderSize();
    pw.copyHeader();
    mcreq_enqueue_packet(pw.pipeline, pw.pkt);
    mcreq_packet_handled(pw.pipeline, pw.pkt);

    nb_IOV iovs[10];

    unsigned toFlush = mcreq_flush_iov_fill(pw.pipeline, iovs, 10, NULL);
    EXPECT_EQ(28, toFlush);
    mcreq_flush_done(pw.pipeline, 8, toFlush);

    toFlush = mcreq_flush_iov_fill(pw.pipeline, iovs, 10, NULL);
    EXPECT_EQ(20, toFlush);
    mcreq_flush_done(pw.pipeline, toFlush, toFlush);

    toFlush = mcreq_flush_iov_fill(pw.pipeline, iovs, 10, NULL);
    ASSERT_EQ(0, toFlush);
    ASSERT_EQ(1, cookie.ncalled);
}

TEST_F(McFlush, testFlushedUnhandled)
{
    CQWrap cq;
    PacketWrap pw;
    cq.setBufFreeCallback(buf_free_callback);
    pw.setContigKey("1234");

    MyCookie cookie;
    cookie.exp_kbuf = pw.pktbuf;

    ASSERT_TRUE(pw.reservePacket(&cq));
    pw.setCookie(&cookie);
    pw.setHeaderSize();
    pw.copyHeader();

    mcreq_enqueue_packet(pw.pipeline, pw.pkt);

    nb_IOV iovs[10];
    unsigned toFlush = mcreq_flush_iov_fill(pw.pipeline, iovs, 10, NULL);
    ASSERT_EQ(28, toFlush);
    mcreq_flush_done(pw.pipeline, toFlush, toFlush);

    ASSERT_EQ(0, cookie.ncalled);
    ASSERT_NE(0, pw.pkt->flags & MCREQ_F_FLUSHED);

    ASSERT_EQ(pw.pkt, mcreq_pipeline_remove(pw.pipeline, pw.pkt->opaque));
    mcreq_packet_handled(pw.pipeline, pw.pkt);
    ASSERT_EQ(1, cookie.ncalled);
}

TEST_F(McFlush, testFlushCopy)
{
    CQWrap cq;
    PacketWrap pw;
    cq.setBufFreeCallback(buf_free_callback);
    pw.setCopyKey("Hello");
    ASSERT_TRUE(pw.reservePacket(&cq));

    MyCookie cookie;
    pw.setHeaderSize();
    pw.copyHeader();
    pw.setCookie(&cookie);
    mcreq_enqueue_packet(pw.pipeline, pw.pkt);

    nb_IOV iov[10];
    unsigned int toFlush = mcreq_flush_iov_fill(pw.pipeline, iov, 10, NULL);
    mcreq_flush_done(pw.pipeline, toFlush, toFlush);
    mcreq_pipeline_remove(pw.pipeline, pw.pkt->opaque);
    mcreq_packet_handled(pw.pipeline, pw.pkt);
    ASSERT_EQ(0, cookie.ncalled);
}

TEST_F(McFlush, testMultiFlush)
{
    CQWrap cq;
    int counter = 0;
    const int nitems = 10;
    MyCookie **cookies;

    cookies = new MyCookie *[nitems];
    PacketWrap **pws = new PacketWrap *[nitems];
    cq.setBufFreeCallback(buf_free_callback);

    for (int ii = 0; ii < nitems; ii++) {
        PacketWrap *pw = new PacketWrap;
        pws[ii] = pw;
        char curkey[128];
        sprintf(curkey, "Key_%d", ii);
        pw->setContigKey(curkey);

        cookies[ii] = new MyCookie;
        cookies[ii]->exp_kbuf = pw->pktbuf;

        ASSERT_TRUE(pw->reservePacket(&cq));

        pw->setCookie(cookies[ii]);
        mcreq_enqueue_packet(pw->pipeline, pw->pkt);
        pw->setHeaderSize();
        pw->copyHeader();
        mcreq_packet_handled(pw->pipeline, pw->pkt);
        mcreq_pipeline_remove(pw->pipeline, pw->pkt->opaque);
    }

    for (unsigned ii = 0; ii < cq.npipelines; ii++) {
        mc_PIPELINE *pipeline = cq.pipelines[ii];
        nb_IOV iov[10];
        unsigned toFlush = mcreq_flush_iov_fill(pipeline, iov, 10, NULL);
        if (toFlush) {
            mcreq_flush_done(pipeline, toFlush, toFlush);
        }
    }

    for (int ii = 0; ii < nitems; ii++) {
        ASSERT_EQ(1, cookies[ii]->ncalled);
        delete cookies[ii];
        delete pws[ii];
    }
    delete[] cookies;
    delete[] pws;
}

TEST_F(McFlush, testPartialFlush)
{
    CQWrap cq;
    PacketWrap pw;
    MyCookie cookie;

    cq.setBufFreeCallback(buf_free_callback);
    pw.setContigKey("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
    ASSERT_TRUE(pw.reservePacket(&cq));
    pw.setCookie(&cookie);
    cookie.exp_kbuf = pw.pktbuf;
    pw.setHeaderSize();
    pw.copyHeader();
    mcreq_enqueue_packet(pw.pipeline, pw.pkt);

    nb_IOV iov[1];
    unsigned int toFlush = 0;
    do {
        toFlush = mcreq_flush_iov_fill(pw.pipeline, iov, 1, NULL);
        if (toFlush) {
            mcreq_flush_done(pw.pipeline, 1, toFlush);
        }
    } while (toFlush > 0);

    ASSERT_NE(0, pw.pkt->flags & MCREQ_F_FLUSHED);
    mcreq_pipeline_remove(pw.pipeline, pw.pkt->opaque);
    mcreq_packet_handled(pw.pipeline, pw.pkt);
    ASSERT_EQ(1, cookie.ncalled);
}
