/********************************************************************
 * 2014 -
 * open source under Apache License Version 2.0
 ********************************************************************/
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "Atomic.h"
#include "BlockLocation.h"
#include "DirectoryIterator.h"
#include "EncryptionZoneIterator.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "FileStatus.h"
#include "FileSystemImpl.h"
#include "FileSystemStats.h"
#include "EncryptionZoneInfo.h"
#include "InputStream.h"
#include "LeaseRenewer.h"
#include "Logger.h"
#include "OutputStream.h"
#include "OutputStreamImpl.h"
#include "server/LocatedBlocks.h"
#include "server/NamenodeInfo.h"
#include "server/NamenodeProxy.h"
#include "StringUtil.h"

#include <cstring>
#include <inttypes.h>
#include <libxml/uri.h>
#include <strings.h>

namespace Hdfs {
namespace Internal {

static const std::string GetAbsPath(const std::string & prefix,
                                    const std::string & path) {
    if (path.empty()) {
        return prefix;
    }

    if ('/' == path[0]) {
        return path;
    } else {
        return prefix + "/" + path;
    }
}

/*
 * Return the canonical absolute name of file NAME.
 * A canonical name does not contain any `.', `..' components nor any repeated path separators ('/')
 */
static const std::string CanonicalizePath(const std::string & path) {
    int skip = 0;
    std::string retval;
    std::vector<std::string> components = StringSplit(path, "/");
    std::deque<std::string> tmp;
    std::vector<std::string>::reverse_iterator s = components.rbegin();

    while (s != components.rend()) {
        if (s->empty() || *s == ".") {
            ++s;
        } else if (*s == "..") {
            ++skip;
            ++s;
        } else {
            if (skip <= 0) {
                tmp.push_front(*s);
            } else {
                --skip;
            }

            ++s;
        }
    }

    for (size_t i = 0; i < tmp.size(); ++i) {
        retval += "/";
        retval += tmp[i];
    }

    return retval.empty() ? "/" : retval;
}

FileSystemImpl::FileSystemImpl(const FileSystemKey& key, const Config& c)
    : conf(c),
      key(key),
      openedOutputStream(0),
      nn(NULL),
      sconf(c),
      user(key.getUser()) {
    static atomic<uint32_t> count(0);
    std::stringstream ss;
    ss.imbue(std::locale::classic());
    srand((unsigned int) time(NULL));
    ss << "libhdfs3_client_random_" << rand() << "_count_" << ++count << "_pid_"
       << getpid() << "_tid_" << pthread_self();
    clientName = ss.str();
    workingDir = std::string("/user/") + user.getEffectiveUser();
    peerCache = shared_ptr<PeerCache>(new PeerCache(sconf));
#ifdef MOCK
    stub = NULL;
#endif
    //set log level
    RootLogger.setLogSeverity(sconf.getLogSeverity());
}

/**
 * Destroy a FileSystemBase instance
 */
FileSystemImpl::~FileSystemImpl() {
    try {
        disconnect();
    } catch (...) {
    }
}

const std::string FileSystemImpl::getStandardPath(const char * path) {
    std::string base;
    {
        lock_guard<mutex> lock(mutWorkingDir);
        base = workingDir;
    }
    return CanonicalizePath(GetAbsPath(base, path));
}

const char * FileSystemImpl::getClientName() {
    return clientName.c_str();
}

void FileSystemImpl::connect() {
    std::string host, port, uri;
    std::vector<NamenodeInfo> namenodeInfos;

    if (nn) {
        THROW(HdfsIOException, "FileSystemImpl: already connected.");
    }

    host = key.getHost();
    port = key.getPort();
    uri += key.getScheme() + "://" + host;

    if (port.empty()) {
        try {
            namenodeInfos = NamenodeInfo::GetHANamenodeInfo(key.getHost(), conf);
        } catch (const HdfsConfigNotFound & e) {
            NESTED_THROW(InvalidParameter, "Cannot parse URI: %s, missing port or invalid HA configuration", uri.c_str());
        }

        tokenService = "ha-hdfs:";
        tokenService += host;
    } else {
        std::stringstream ss;
        ss.imbue(std::locale::classic());
        ss << host << ":" << port;
        namenodeInfos.resize(1);
        namenodeInfos[0].setRpcAddr(ss.str());
        tokenService = namenodeInfos[0].getRpcAddr();
    }

#ifdef MOCK
    nn = stub->getNamenode();
#else
    nn = new NamenodeProxy(namenodeInfos, tokenService, sconf, RpcAuth(user, RpcAuth::ParseMethod(sconf.getRpcAuthMethod())));
#endif
    /*
     * To test if the connection is ok
     */
    getFsStats();
}

/**
 * disconnect from hdfs
 */
void FileSystemImpl::disconnect() {
    if (nn) {
        nn->close();
        delete nn;
    }

    nn = NULL;
}

/**
 * To get default number of replication.
 * @return the default number of replication.
 */
int FileSystemImpl::getDefaultReplication() const {
    return sconf.getDefaultReplica();
}

/**
 * To get the default number of block size.
 * @return the default block size.
 */
int64_t FileSystemImpl::getDefaultBlockSize() const {
    return sconf.getDefaultBlockSize();
}

/**
 * To get the home directory.
 * @return home directory.
 */
std::string FileSystemImpl::getHomeDirectory() const {
    return std::string("/user/") + user.getEffectiveUser();
}

/**
 * To delete a file or directory.
 * @param path the path to be deleted.
 * @param recursive if path is a directory, delete the contents recursively.
 * @return return true if success.
 */

bool FileSystemImpl::deletePath(const char * path, bool recursive) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    return nn->deleteFile(getStandardPath(path), recursive);
}

/**
 * To create a directory which given permission.
 * @param path the directory path which is to be created.
 * @param permission directory permission.
 * @return return true if success.
 */

bool FileSystemImpl::mkdir(const char * path, const Permission & permission) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    return nn->mkdirs(getStandardPath(path), permission, false);
}

/**
 * To create a directory which given permission.
 * If parent path does not exits, create it.
 * @param path the directory path which is to be created.
 * @param permission directory permission.
 * @return return true if success.
 */

bool FileSystemImpl::mkdirs(const char * path, const Permission & permission) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    return nn->mkdirs(getStandardPath(path), permission, true);
}

/**
 * To get path information.
 * @param path the path which information is to be returned.
 * @return the path information.
 */
FileStatus FileSystemImpl::getFileStatus(const char * path) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    return nn->getFileInfo(getStandardPath(path), NULL);
}

static void Convert(BlockLocation & bl, const LocatedBlock & lb) {
    const std::vector<DatanodeInfo> & nodes = lb.getLocations();
    bl.setCorrupt(lb.isCorrupt());
    bl.setLength(lb.getNumBytes());
    bl.setOffset(lb.getOffset());
    std::vector<std::string> hosts(nodes.size());
    std::vector<std::string> names(nodes.size());
    std::vector<std::string> topologyPaths(nodes.size());

    for (size_t i = 0 ; i < nodes.size() ; ++i) {
        hosts[i] = nodes[i].getHostName();
        names[i] = nodes[i].getXferAddr();
        topologyPaths[i] = nodes[i].getLocation() + '/' + nodes[i].getXferAddr();
    }

    bl.setNames(names);
    bl.setHosts(hosts);
    bl.setTopologyPaths(topologyPaths);
}

std::vector<BlockLocation> FileSystemImpl::getFileBlockLocations(
    const char * path, int64_t start, int64_t len) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    if (start < 0) {
        THROW(InvalidParameter, "Invalid input: start offset should be positive");
    }

    if (len < 0) {
        THROW(InvalidParameter, "Invalid input: length should be positive");
    }

    LocatedBlocksImpl lbs;
    nn->getBlockLocations(getStandardPath(path), start, len, lbs);
    std::vector<LocatedBlock> blocks = lbs.getBlocks();
    std::vector<BlockLocation> retval(blocks.size());

    for (size_t i = 0; i < blocks.size(); ++i) {
        Convert(retval[i], blocks[i]);
    }

    return retval;
}

/**
 * list the contents of a directory.
 * @param path the directory path.
 * @return return the path informations in the given directory.
 */
DirectoryIterator FileSystemImpl::listDirectory(const char * path,
        bool needLocation) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    return DirectoryIterator(this, getStandardPath(path), needLocation);
}

/**
 * list all the contents of a directory.
 * @param path The directory path.
 * @return Return a vector of file informations in the directory.
 */
std::vector<FileStatus> FileSystemImpl::listAllDirectoryItems(const char * path,
        bool needLocation) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    std::string startAfter;
    std::string p = getStandardPath(path);
    std::vector<FileStatus> retval;

    while (getListing(p, startAfter, needLocation, retval)) {
        startAfter = retval.back().getPath();
    }

    return retval;
}

/**
 * To set the owner and the group of the path.
 * username and groupname cannot be empty at the same time.
 * @param path the path which owner of group is to be changed.
 * @param username new user name.
 * @param groupname new group.
 */
void FileSystemImpl::setOwner(const char * path, const char * username,
                              const char * groupname) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    if ((NULL == username || !strlen(username))
            && (NULL == groupname || !strlen(groupname))) {
        THROW(InvalidParameter,
              "Invalid input: username and groupname should not be empty");
    }

    nn->setOwner(getStandardPath(path), username != NULL ? username : "",
                 groupname != NULL ? groupname : "");
}

/**
 * To set the access time or modification time of a path.
 * @param path the path which access time or modification time is to be changed.
 * @param mtime new modification time.
 * @param atime new access time.
 */
void FileSystemImpl::setTimes(const char * path, int64_t mtime, int64_t atime) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    nn->setTimes(getStandardPath(path), mtime, atime);
}

/**
 * To set the permission of a path.
 * @param path the path which permission is to be changed.
 * @param permission new permission.
 */
void FileSystemImpl::setPermission(const char * path,
                                   const Permission & permission) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    nn->setPermission(getStandardPath(path), permission);
}

/**
 * To set the number of replication.
 * @param path the path which number of replication is to be changed.
 * @param replication new number of replication.
 * @return return true if success.
 */

bool FileSystemImpl::setReplication(const char * path, short replication) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    return nn->setReplication(getStandardPath(path), replication);
}

/**
 * To rename a path.
 * @param src old path.
 * @param dst new path.
 * @return return true if success.
 */

bool FileSystemImpl::rename(const char * src, const char * dst) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == src || !strlen(src)) {
        THROW(InvalidParameter, "Invalid input: src should not be empty");
    }

    if (NULL == dst || !strlen(dst)) {
        THROW(InvalidParameter, "Invalid input: dst should not be empty");
    }

    return nn->rename(getStandardPath(src), getStandardPath(dst));
}

/**
 * To set working directory.
 * @param path new working directory.
 */
void FileSystemImpl::setWorkingDirectory(const char * path) {
    if (NULL == path) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    if (!strlen(path) || '/' != path[0]) {
        THROW(InvalidParameter,
              "Invalid input: path should be an absolute path");
    }

    lock_guard<mutex> lock(mutWorkingDir);
    workingDir = path;
}

/**
 * To get working directory.
 * @return working directory.
 */
std::string FileSystemImpl::getWorkingDirectory() const {
    return workingDir;
}

/**
 * To test if the path exist.
 * @param path the path which is to be tested.
 * @return return true if the path exist.
 */

bool FileSystemImpl::exist(const char * path) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    try {
        bool retval = true;
        nn->getFileInfo(getStandardPath(path), &retval);
        return retval;
    } catch (const FileNotFoundException & e) {
        return false;
    }

    return true;
}

/**
 * To get the file system status.
 * @return the file system status.
 */
FileSystemStats FileSystemImpl::getFsStats() {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    std::vector<int64_t> retval = nn->getFsStats();
    assert(retval.size() >= 3);
    return FileSystemStats(retval[0], retval[1], retval[2]);
}

/**
 * Truncate the file in the indicated path to the indicated size.
 * @param path The path to the file to be truncated
 * @param size The size the file is to be truncated to
 *
 * @return true if and client does not need to wait for block recovery,
 * false if client needs to wait for block recovery.
 */
bool FileSystemImpl::truncate(const char * path, int64_t size) {
    LOG(DEBUG1, "truncate file %s to length %" PRId64, path, size);

    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: src should not be empty.");
    }

    std::string absPath = getStandardPath(path);

    return nn->truncate(absPath, size, clientName);
}

std::string FileSystemImpl::getDelegationToken(const char * renewer) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == renewer || !strlen(renewer)) {
        THROW(InvalidParameter, "Invalid input: renewer should not be empty.");
    }

    Token retval = nn->getDelegationToken(renewer);
    retval.setService(tokenService);
    return retval.toString();
}

std::string FileSystemImpl::getDelegationToken() {
    return getDelegationToken(key.getUser().getPrincipal().c_str());
}

int64_t FileSystemImpl::renewDelegationToken(const std::string & token) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    Token t;
    t.fromString(token);
    return  nn->renewDelegationToken(t);
}

void FileSystemImpl::cancelDelegationToken(const std::string & token) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    Token t;
    t.fromString(token);
    nn->cancelDelegationToken(t);
}

void FileSystemImpl::getBlockLocations(const std::string & src, int64_t offset,
                                       int64_t length, LocatedBlocks & lbs) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    nn->getBlockLocations(src, offset, length, lbs);
}

void FileSystemImpl::create(const std::string & src, const Permission & masked,
                            int flag, bool createParent, short replication, int64_t blockSize) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    nn->create(src, masked, clientName, flag, createParent, replication,
               blockSize);
}

std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >
FileSystemImpl::append(const std::string& src) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    return nn->append(src, clientName);
}

void FileSystemImpl::abandonBlock(const ExtendedBlock & b,
                                  const std::string & src) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    nn->abandonBlock(b, src, clientName);
}

shared_ptr<LocatedBlock> FileSystemImpl::addBlock(const std::string & src,
        const ExtendedBlock * previous,
        const std::vector<DatanodeInfo> & excludeNodes) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    return nn->addBlock(src, clientName, previous, excludeNodes);
}

shared_ptr<LocatedBlock> FileSystemImpl::getAdditionalDatanode(
    const std::string & src, const ExtendedBlock & blk,
    const std::vector<DatanodeInfo> & existings,
    const std::vector<std::string> & storageIDs,
    const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    return nn->getAdditionalDatanode(src, blk, existings, storageIDs, excludes,
                                     numAdditionalNodes, clientName);
}

bool FileSystemImpl::complete(const std::string & src,
                              const ExtendedBlock * last) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    return nn->complete(src, clientName, last);
}

/*void FileSystemImpl::reportBadBlocks(const std::vector<LocatedBlock> & blocks) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    nn->reportBadBlocks(blocks);
}*/

void FileSystemImpl::fsync(const std::string & src) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    nn->fsync(src, clientName);
}

shared_ptr<LocatedBlock> FileSystemImpl::updateBlockForPipeline(
    const ExtendedBlock & block) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    return nn->updateBlockForPipeline(block, clientName);
}

void FileSystemImpl::updatePipeline(const ExtendedBlock & oldBlock,
                                    const ExtendedBlock & newBlock,
                                    const std::vector<DatanodeInfo> & newNodes,
                                    const std::vector<std::string> & storageIDs) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    nn->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs);
}

bool FileSystemImpl::getListing(const std::string & src,
                                const std::string & startAfter, bool needLocation,
                                std::vector<FileStatus> & dl) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    return nn->getListing(src, startAfter, needLocation, dl);
}

bool FileSystemImpl::renewLease() {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    //protected by LeaseRenewer's lock
    if (0 == openedOutputStream) {
        return false;
    }

    try {
        nn->renewLease(clientName);
        return true;
    } catch (const HdfsException & e) {
        std::string buffer;
        LOG(LOG_ERROR,
            "Failed to renew lease for filesystem which client name is %s, since:\n%s",
            getClientName(), GetExceptionDetail(e, buffer));
    } catch (const std::exception & e) {
        LOG(LOG_ERROR,
            "Failed to renew lease for filesystem which client name is %s, since:\n%s",
            getClientName(), e.what());
    }

    return false;
}

void FileSystemImpl::registerOpenedOutputStream() {
    //protected by LeaseRenewer's lock
    ++openedOutputStream;
}

bool FileSystemImpl::unregisterOpenedOutputStream() {
    //protected by LeaseRenewer's lock
    if (openedOutputStream > 0) {
        --openedOutputStream;
    }

    return  openedOutputStream == 0;
}

/**
 * Create encryption zone for the directory with specific key name
 * @param path the directory path which is to be created.
 * @param keyname The key name of the encryption zone 
 * @return return true if success.
 */

bool FileSystemImpl::createEncryptionZone(const char * path, const char * keyName) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    if (NULL == keyName || !strlen(keyName)) {
        THROW(InvalidParameter, "Invalid input: key name should not be empty");
    }

    return nn->createEncryptionZone(getStandardPath(path), keyName);
}


/**
 * To get encryption zone information.
 * @param path the path which information is to be returned.
 * @return the encryption zone information.
 */

EncryptionZoneInfo FileSystemImpl::getEZForPath(const char * path) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    if (NULL == path || !strlen(path)) {
        THROW(InvalidParameter, "Invalid input: path should not be empty");
    }

    return nn->getEncryptionZoneInfo(getStandardPath(path), NULL);
}

bool FileSystemImpl::listEncryptionZones(const int64_t id,
                                std::vector<EncryptionZoneInfo> & ezl) {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    return nn->listEncryptionZones(id, ezl);
}

/**
 * list the contents of an encryption zone.
 * @return return the encryption zone information.
 */
EncryptionZoneIterator FileSystemImpl::listEncryptionZone() {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    return EncryptionZoneIterator(this, 0);
}
/**
 * list all the contents of encryption zones.
 * @param id the index of the encyrption zones.
 * @return Return a vector of encryption zones information.
 */

std::vector<EncryptionZoneInfo> FileSystemImpl::listAllEncryptionZoneItems() {
    if (!nn) {
        THROW(HdfsIOException, "FileSystemImpl: not connected.");
    }

    std::vector<EncryptionZoneInfo> retval;
    retval.clear();
    int64_t id = 0;

    EncryptionZoneIterator it;
    it = FileSystemImpl::listEncryptionZone();

    while (it.hasNext()) {
        retval.push_back(it.getNext());
    }
    return retval;
}

}
}
