/********************************************************************
 * 2014 -
 * open source under Apache License Version 2.0
 ********************************************************************/
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "Exception.h"
#include "ExceptionInternal.h"
#include "Logger.h"
#include "NamenodeImpl.h"
#include "NamenodeProxy.h"
#include "StringUtil.h"

#include <string>

#include <sys/fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/file.h>

namespace Hdfs {
namespace Internal {

static uint32_t GetInitNamenodeIndex(const std::string id) {
    std::string path = "/tmp/";
    path += id;
    int fd;
    uint32_t index = 0;
    /*
     * try create the file
     */
    fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0666);

    if (fd < 0) {
        if (errno == EEXIST) {
            /*
             * the file already exist, try to open it
             */
            fd = open(path.c_str(), O_RDONLY);
        } else {
            /*
             * failed to create, do not care why
             */
            return 0;
        }
    } else {
        if (0 != flock(fd, LOCK_EX)) {
            /*
             * failed to lock
             */
            close(fd);
            return index;
        }

        /*
         * created file, initialize it with 0
         */
        if (write(fd, &index, sizeof(index)) < 0) {
          LOG(WARNING,
              "NamenodeProxy: Failed to write current Namenode index into "
              "cache file.");
            /*
             * ignore the failure.
             */
        }

        flock(fd, LOCK_UN);
        close(fd);
        return index;
    }

    /*
     * the file exist, read it.
     */
    if (fd >= 0) {
        if (0 != flock(fd, LOCK_SH)) {
            /*
             * failed to lock
             */
            close(fd);
            return index;
        }

        if (sizeof(index) != read(fd, &index, sizeof(index))) {
            /*
             * failed to read, do not care why
             */
            index = 0;
        }

        flock(fd, LOCK_UN);
        close(fd);
    }

    return index;
}

static void SetInitNamenodeIndex(const std::string & id, uint32_t index) {
    std::string path = "/tmp/";
    path += id;
    int fd;
    /*
     * try open the file for write
     */
    fd = open(path.c_str(), O_WRONLY);

    if (fd > 0) {
        if (0 != flock(fd, LOCK_EX)) {
            /*
             * failed to lock
             */
            close(fd);
            return;
        }

        if (write(fd, &index, sizeof(index)) < 0) {
            LOG(WARNING,
                "NamenodeProxy: Failed to write current Namenode index into "
                "cache file.");
            /*
             * ignore the failure.
             */
        }
        flock(fd, LOCK_UN);
        close(fd);
    }
}

NamenodeProxy::NamenodeProxy(const std::vector<NamenodeInfo> & namenodeInfos, const std::string & tokenService,
                             const SessionConfig & c, const RpcAuth & a) :
    clusterid(tokenService), currentNamenode(0) {
    if (namenodeInfos.size() == 1) {
        enableNamenodeHA = false;
        maxNamenodeHARetry = 0;
    } else {
        enableNamenodeHA = true;
        maxNamenodeHARetry = c.getRpcMaxHaRetry();
    }

    for (size_t i = 0; i < namenodeInfos.size(); ++i) {
        std::vector<std::string> nninfo = StringSplit(namenodeInfos[i].getRpcAddr(), ":");

        if (nninfo.size() != 2) {
            THROW(InvalidParameter, "Cannot create namenode proxy, %s does not contain host or port",
                  namenodeInfos[i].getRpcAddr().c_str());
        }

        namenodes.push_back(
            shared_ptr<Namenode>(
                new NamenodeImpl(nninfo[0].c_str(), nninfo[1].c_str(), clusterid, c, a)));
    }

    if (enableNamenodeHA) {
        currentNamenode = GetInitNamenodeIndex(clusterid) % namenodeInfos.size();
    }
}

NamenodeProxy::~NamenodeProxy() {
}

shared_ptr<Namenode> NamenodeProxy::getActiveNamenode(uint32_t & oldValue) {
    lock_guard<mutex> lock(mut);

    if (namenodes.empty()) {
        THROW(HdfsFileSystemClosed, "NamenodeProxy is closed.");
    }

    oldValue = currentNamenode;
    return namenodes[currentNamenode % namenodes.size()];
}

void NamenodeProxy::failoverToNextNamenode(uint32_t oldValue) {
    lock_guard<mutex> lock(mut);

    if (oldValue != currentNamenode) {
        //already failover in another thread.
        return;
    }

    ++currentNamenode;
    currentNamenode = currentNamenode % namenodes.size();
    SetInitNamenodeIndex(clusterid, currentNamenode);
}

static void HandleHdfsFailoverException(const HdfsFailoverException & e) {
    try {
        Hdfs::rethrow_if_nested(e);
    } catch (...) {
        NESTED_THROW(Hdfs::HdfsRpcException, "%s", e.what());
    }

    //should not reach here
    abort();
}

#define NAMENODE_HA_RETRY_BEGIN() \
    do { \
        int __count = 0; \
        do { \
            uint32_t __oldValue = 0; \
            shared_ptr<Namenode> namenode =  getActiveNamenode(__oldValue); \
            try { \
                (void)0

#define NAMENODE_HA_RETRY_END() \
    break; \
    } catch (const NameNodeStandbyException & e) { \
        if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \
            throw; \
        } \
    } catch (const HdfsFailoverException & e) { \
        if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \
            HandleHdfsFailoverException(e); \
        } \
    } \
    failoverToNextNamenode(__oldValue); \
    LOG(WARNING, "NamenodeProxy: Failover to another Namenode."); \
    } while (true); \
    } while (0)

void NamenodeProxy::getBlockLocations(const std::string & src, int64_t offset,
                                      int64_t length, LocatedBlocks & lbs) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->getBlockLocations(src, offset, length, lbs);
    NAMENODE_HA_RETRY_END();
}

void NamenodeProxy::create(const std::string & src, const Permission & masked,
                           const std::string & clientName, int flag, bool createParent,
                           short replication, int64_t blockSize) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->create(src, masked, clientName, flag, createParent, replication, blockSize);
    NAMENODE_HA_RETRY_END();
}

std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >
NamenodeProxy::append(const std::string& src, const std::string& clientName) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->append(src, clientName);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >();
}

bool NamenodeProxy::setReplication(const std::string & src, short replication) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->setReplication(src, replication);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}

void NamenodeProxy::setPermission(const std::string & src,
                                  const Permission & permission) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->setPermission(src, permission);
    NAMENODE_HA_RETRY_END();
}

void NamenodeProxy::setOwner(const std::string & src,
                             const std::string & username, const std::string & groupname) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->setOwner(src, username, groupname);
    NAMENODE_HA_RETRY_END();
}

void NamenodeProxy::abandonBlock(const ExtendedBlock & b,
                                 const std::string & src, const std::string & holder) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->abandonBlock(b, src, holder);
    NAMENODE_HA_RETRY_END();
}

shared_ptr<LocatedBlock> NamenodeProxy::addBlock(const std::string & src,
        const std::string & clientName, const ExtendedBlock * previous,
        const std::vector<DatanodeInfo> & excludeNodes) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->addBlock(src, clientName, previous, excludeNodes);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return shared_ptr<LocatedBlock>();
}

shared_ptr<LocatedBlock> NamenodeProxy::getAdditionalDatanode(
    const std::string & src, const ExtendedBlock & blk,
    const std::vector<DatanodeInfo> & existings,
    const std::vector<std::string> & storageIDs,
    const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes,
    const std::string & clientName) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getAdditionalDatanode(src, blk, existings,
                                           storageIDs, excludes, numAdditionalNodes, clientName);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return shared_ptr<LocatedBlock>();
}

bool NamenodeProxy::complete(const std::string & src,
                             const std::string & clientName, const ExtendedBlock * last) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->complete(src, clientName, last);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}

/*void NamenodeProxy::reportBadBlocks(const std::vector<LocatedBlock> & blocks) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->reportBadBlocks(blocks);
    NAMENODE_HA_RETRY_END();
}*/

bool NamenodeProxy::rename(const std::string & src, const std::string & dst) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->rename(src, dst);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}

/*
void NamenodeProxy::concat(const std::string & trg,
                           const std::vector<std::string> & srcs) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->concat(trg, srcs);
    NAMENODE_HA_RETRY_END();
}
*/

bool NamenodeProxy::truncate(const std::string & src, int64_t size,
                             const std::string & clientName) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->truncate(src, size, clientName);
    NAMENODE_HA_RETRY_END();
}

void NamenodeProxy::getLease(const std::string & src,
                             const std::string & clientName) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->getLease(src, clientName);
    NAMENODE_HA_RETRY_END();
}

void NamenodeProxy::releaseLease(const std::string & src,
                                 const std::string & clientName) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->releaseLease(src, clientName);
    NAMENODE_HA_RETRY_END();
}

bool NamenodeProxy::deleteFile(const std::string & src, bool recursive) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->deleteFile(src, recursive);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}

bool NamenodeProxy::mkdirs(const std::string & src, const Permission & masked,
                           bool createParent) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->mkdirs(src, masked, createParent);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}

bool NamenodeProxy::getListing(const std::string & src,
                               const std::string & startAfter, bool needLocation,
                               std::vector<FileStatus> & dl) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getListing(src, startAfter, needLocation, dl);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}

void NamenodeProxy::renewLease(const std::string & clientName) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->renewLease(clientName);
    NAMENODE_HA_RETRY_END();
}

/*bool NamenodeProxy::recoverLease(const std::string & src,
                                 const std::string & clientName) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->recoverLease(src, clientName);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}*/

std::vector<int64_t> NamenodeProxy::getFsStats() {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getFsStats();
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return std::vector<int64_t>();
}

/*void NamenodeProxy::metaSave(const std::string & filename) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->metaSave(filename);
    NAMENODE_HA_RETRY_END();
}*/

FileStatus NamenodeProxy::getFileInfo(const std::string & src, bool *exist) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getFileInfo(src, exist);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return FileStatus();
}

/*FileStatus NamenodeProxy::getFileLinkInfo(const std::string & src) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getFileLinkInfo(src);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return FileStatus();
}*/

/*ContentSummary NamenodeProxy::getContentSummary(const std::string & path) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getContentSummary(path);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return ContentSummary();
}*/

/*void NamenodeProxy::setQuota(const std::string & path, int64_t namespaceQuota,
                             int64_t diskspaceQuota) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->setQuota(path, namespaceQuota, diskspaceQuota);
    NAMENODE_HA_RETRY_END();
}*/

void NamenodeProxy::fsync(const std::string & src, const std::string & client) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->fsync(src, client);
    NAMENODE_HA_RETRY_END();
}

void NamenodeProxy::setTimes(const std::string & src, int64_t mtime,
                             int64_t atime) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->setTimes(src, mtime, atime);
    NAMENODE_HA_RETRY_END();
}

/*void NamenodeProxy::createSymlink(const std::string & target,
                                  const std::string & link, const Permission & dirPerm,
                                  bool createParent) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->createSymlink(target, link, dirPerm, createParent);
    NAMENODE_HA_RETRY_END();
}*/

/*std::string NamenodeProxy::getLinkTarget(const std::string & path) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getLinkTarget(path);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return "";
}*/

shared_ptr<LocatedBlock> NamenodeProxy::updateBlockForPipeline(
    const ExtendedBlock & block, const std::string & clientName) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->updateBlockForPipeline(block, clientName);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return shared_ptr<LocatedBlock>();
}

void NamenodeProxy::updatePipeline(const std::string & clientName,
                                   const ExtendedBlock & oldBlock, const ExtendedBlock & newBlock,
                                   const std::vector<DatanodeInfo> & newNodes,
                                   const std::vector<std::string> & storageIDs) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs);
    NAMENODE_HA_RETRY_END();
}

Token NamenodeProxy::getDelegationToken(const std::string & renewer) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getDelegationToken(renewer);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return Token();
}

int64_t NamenodeProxy::renewDelegationToken(const Token & token) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->renewDelegationToken(token);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return 0;
}

void NamenodeProxy::cancelDelegationToken(const Token & token) {
    NAMENODE_HA_RETRY_BEGIN();
    namenode->cancelDelegationToken(token);
    NAMENODE_HA_RETRY_END();
}

void NamenodeProxy::close() {
    lock_guard<mutex> lock(mut);
    namenodes.clear();
}

bool NamenodeProxy::createEncryptionZone(const std::string & src, const std::string & keyName) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->createEncryptionZone(src, keyName);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}

EncryptionZoneInfo NamenodeProxy::getEncryptionZoneInfo(const std::string & src, bool *exist) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->getEncryptionZoneInfo(src, exist);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return EncryptionZoneInfo();
}

bool NamenodeProxy::listEncryptionZones(const int64_t id, std::vector<EncryptionZoneInfo> & ezl) {
    NAMENODE_HA_RETRY_BEGIN();
    return namenode->listEncryptionZones(id, ezl);
    NAMENODE_HA_RETRY_END();
    assert(!"should not reach here");
    return false;
}

}
}
