/********************************************************************
 * 2014 -
 * open source under Apache License Version 2.0
 ********************************************************************/
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "Exception.h"
#include "ExceptionInternal.h"
#include "FileSystemInter.h"
#include "InputStreamImpl.h"
#include "InputStreamInter.h"
#include "LocalBlockReader.h"
#include "Logger.h"
#include "RemoteBlockReader.h"
#include "server/Datanode.h"
#include "Thread.h"

#include <algorithm>
#include <ifaddrs.h>
#include <inttypes.h>
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

namespace Hdfs {
namespace Internal {

unordered_set<std::string> BuildLocalAddrSet() {
    unordered_set<std::string> set;
    struct ifaddrs * ifAddr = NULL;
    struct ifaddrs * pifAddr = NULL;
    struct sockaddr * addr;

    if (getifaddrs(&ifAddr)) {
        THROW(HdfsNetworkException,
              "InputStreamImpl: cannot get local network interface: %s",
              GetSystemErrorInfo(errno));
    }

    try {
        std::vector<char> host;
        const char * pHost;
        host.resize(INET6_ADDRSTRLEN + 1);

        for (pifAddr = ifAddr; pifAddr != NULL; pifAddr = pifAddr->ifa_next) {
            addr = pifAddr->ifa_addr;

            if (!addr) {
                continue;
            }

            memset(&host[0], 0, INET6_ADDRSTRLEN + 1);

            if (addr->sa_family == AF_INET) {
                pHost =
                    inet_ntop(addr->sa_family,
                              &(reinterpret_cast<struct sockaddr_in *>(addr))->sin_addr,
                              &host[0], INET6_ADDRSTRLEN);
            } else if (addr->sa_family == AF_INET6) {
                pHost =
                    inet_ntop(addr->sa_family,
                              &(reinterpret_cast<struct sockaddr_in6 *>(addr))->sin6_addr,
                              &host[0], INET6_ADDRSTRLEN);
            } else {
                continue;
            }

            if (NULL == pHost) {
                THROW(HdfsNetworkException,
                      "InputStreamImpl: cannot get convert network address to textual form: %s",
                      GetSystemErrorInfo(errno));
            }

            set.insert(pHost);
        }

        /*
         * add hostname.
         */
        long hostlen = sysconf(_SC_HOST_NAME_MAX);
        host.resize(hostlen + 1);

        if (gethostname(&host[0], host.size())) {
            THROW(HdfsNetworkException,
                  "InputStreamImpl: cannot get hostname: %s",
                  GetSystemErrorInfo(errno));
        }

        set.insert(&host[0]);
    } catch (...) {
        if (ifAddr != NULL) {
            freeifaddrs(ifAddr);
        }

        throw;
    }

    if (ifAddr != NULL) {
        freeifaddrs(ifAddr);
    }

    return set;
}

InputStreamImpl::InputStreamImpl() :
    closed(true), localRead(true), readFromUnderConstructedBlock(false), verify(
        true), maxGetBlockInfoRetry(3), cursor(0), endOfCurBlock(0), lastBlockBeingWrittenLength(
            0), prefetchSize(0), peerCache(NULL) {
#ifdef MOCK
    stub = NULL;
#endif
}

InputStreamImpl::~InputStreamImpl() {
}

void InputStreamImpl::checkStatus() {
    if (closed) {
        THROW(HdfsIOException, "InputStreamImpl: stream is not opened.");
    }

    if (lastError != exception_ptr()) {
        rethrow_exception(lastError);
    }
}


int64_t InputStreamImpl::readBlockLength(const LocatedBlock & b) {
    const std::vector<DatanodeInfo> & nodes = b.getLocations();
    int replicaNotFoundCount = nodes.size();

    for (size_t i = 0; i < nodes.size(); ++i) {
        try {
            int64_t n = 0;
            shared_ptr<Datanode> dn;
            RpcAuth a = auth;
            a.getUser().addToken(b.getToken());
#ifdef MOCK

            if (stub) {
                dn = stub->getDatanode();
            } else {
                dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(),
                                              nodes[i].getIpcPort(), *conf, a));
            }

#else
            dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(),
                                          nodes[i].getIpcPort(), *conf, a));
#endif
            n = dn->getReplicaVisibleLength(b);

            if (n >= 0) {
                return n;
            }
        } catch (const ReplicaNotFoundException & e) {
            std::string buffer;
            LOG(LOG_ERROR,
                "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s",
                b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer));
            LOG(INFO,
                "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode",
                b.toString().c_str(), path.c_str());
            --replicaNotFoundCount;
        } catch (const HdfsIOException & e) {
            std::string buffer;
            LOG(LOG_ERROR,
                "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s",
                b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer));
            LOG(INFO,
                "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode",
                b.toString().c_str(), path.c_str());
        }
    }

    // Namenode told us about these locations, but none know about the replica
    // means that we hit the race between pipeline creation start and end.
    // we require all 3 because some other exception could have happened
    // on a DN that has it.  we want to report that error
    if (replicaNotFoundCount == 0) {
        return 0;
    }

    return -1;
}

/**
 * Getting blocks locations'information from namenode
 */
void InputStreamImpl::updateBlockInfos() {
    int retry = maxGetBlockInfoRetry;

    for (int i = 0; i < retry; ++i) {
        try {
            if (!lbs) {
                lbs = shared_ptr < LocatedBlocksImpl > (new LocatedBlocksImpl);
            }

            filesystem->getBlockLocations(path, cursor, prefetchSize, *lbs);

            if (lbs->isLastBlockComplete()) {
                lastBlockBeingWrittenLength = 0;
            } else {
                shared_ptr<LocatedBlock> last = lbs->getLastBlock();

                if (!last) {
                    lastBlockBeingWrittenLength = 0;
                } else {
                    lastBlockBeingWrittenLength = readBlockLength(*last);

                    if (lastBlockBeingWrittenLength == -1) {
                        if (i + 1 >= retry) {
                            THROW(HdfsIOException,
                                  "InputStreamImpl: failed to get block visible length for Block: %s from all Datanode.",
                                  last->toString().c_str());
                        } else {
                            LOG(LOG_ERROR,
                                "InputStreamImpl: failed to get block visible length for Block: %s file %s from all Datanode.",
                                last->toString().c_str(), path.c_str());

                            try {
                                sleep_for(milliseconds(4000));
                            } catch (...) {
                            }

                            continue;
                        }
                    }

                    last->setNumBytes(lastBlockBeingWrittenLength);
                }
            }

            return;
        } catch (const HdfsRpcException & e) {
            std::string buffer;
            LOG(LOG_ERROR,
                "InputStreamImpl: failed to get block information for file %s, %s",
                path.c_str(), GetExceptionDetail(e, buffer));

            if (i + 1 >= retry) {
                throw;
            }
        }

        LOG(INFO,
            "InputStreamImpl: retry to get block information for file: %s, already tried %d time(s).",
            path.c_str(), i + 1);
    }
}

int64_t InputStreamImpl::getFileLength() {
    int64_t length = lbs->getFileLength();

    if (!lbs->isLastBlockComplete()) {
        length += lastBlockBeingWrittenLength;
    }

    return length;
}

void InputStreamImpl::seekToBlock(const LocatedBlock & lb) {
    if (cursor >= lbs->getFileLength()) {
        assert(!lbs->isLastBlockComplete());
        readFromUnderConstructedBlock = true;
    } else {
        readFromUnderConstructedBlock = false;
    }

    assert(cursor >= lb.getOffset()
           && cursor < lb.getOffset() + lb.getNumBytes());
    curBlock = shared_ptr < LocatedBlock > (new LocatedBlock(lb));
    int64_t blockSize = curBlock->getNumBytes();
    assert(blockSize > 0);
    endOfCurBlock = blockSize + curBlock->getOffset();
    failedNodes.clear();
    blockReader.reset();
}

bool InputStreamImpl::choseBestNode() {
    const std::vector<DatanodeInfo> & nodes = curBlock->getLocations();

    for (size_t i = 0; i < nodes.size(); ++i) {
        if (std::binary_search(failedNodes.begin(), failedNodes.end(),
                               nodes[i])) {
            continue;
        }

        curNode = nodes[i];
        return true;
    }

    return false;
}

bool InputStreamImpl::isLocalNode() {
    static const unordered_set<std::string> LocalAddrSet = BuildLocalAddrSet();
    bool retval = LocalAddrSet.find(curNode.getIpAddr()) != LocalAddrSet.end();
    return retval;
}

void InputStreamImpl::setupBlockReader(bool temporaryDisableLocalRead) {
    bool lastReadFromLocal = false;
    exception_ptr lastException;

    while (true) {
        if (!choseBestNode()) {
            try {
                if (lastException) {
                    rethrow_exception(lastException);
                }
            } catch (...) {
                NESTED_THROW(HdfsIOException,
                             "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.",
                             curBlock->toString().c_str());
            }

            THROW(HdfsIOException,
                  "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.",
                  curBlock->toString().c_str());
        }

        try {
            int64_t offset, len;
            offset = cursor - curBlock->getOffset();
            assert(offset >= 0);
            len = curBlock->getNumBytes() - offset;
            assert(len > 0);

            if (!temporaryDisableLocalRead && !lastReadFromLocal &&
                !readFromUnderConstructedBlock && localRead && isLocalNode()) {
                lastReadFromLocal = true;

                shared_ptr<ReadShortCircuitInfo> info;
                ReadShortCircuitInfoBuilder builder(curNode, auth, *conf);

                try {
                    info = builder.fetchOrCreate(*curBlock, curBlock->getToken());

                    if (!info) {
                        continue;
                    }

                    assert(info->isValid());
                    blockReader = shared_ptr<BlockReader>(
                        new LocalBlockReader(info, *curBlock, offset, verify,
                                             *conf, localReaderBuffer));
                } catch (...) {
                    if (info) {
                        info->setValid(false);
                    }

                    throw;
                }
            } else {
                const char * clientName = filesystem->getClientName();
                lastReadFromLocal = false;
                blockReader = shared_ptr<BlockReader>(new RemoteBlockReader(
                    *curBlock, curNode, *peerCache, offset, len,
                    curBlock->getToken(), clientName, verify, *conf));
            }

            break;
        } catch (const HdfsIOException & e) {
            lastException = current_exception();
            std::string buffer;

            if (lastReadFromLocal) {
                LOG(LOG_ERROR,
                    "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\n"
                    "retry the same node but disable read shortcircuit feature",
                    curBlock->toString().c_str(), path.c_str(),
                    curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
                /*
                 * do not add node into failedNodes since we will retry the same node but
                 * disable local block reading
                 */
            } else {
                LOG(LOG_ERROR,
                    "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\nretry another node",
                    curBlock->toString().c_str(), path.c_str(),
                    curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
                failedNodes.push_back(curNode);
                std::sort(failedNodes.begin(), failedNodes.end());
            }
        }
    }
}

void InputStreamImpl::open(shared_ptr<FileSystemInter> fs, const char * path,
                           bool verifyChecksum) {
    if (NULL == path || 0 == strlen(path)) {
        THROW(InvalidParameter, "path is invalid.");
    }

    try {
        openInternal(fs, path, verifyChecksum);
    } catch (...) {
        close();
        throw;
    }
}

void InputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * path,
                                   bool verifyChecksum) {
    try {
        filesystem = fs;
        verify = verifyChecksum;
        this->path = fs->getStandardPath(path);
        LOG(DEBUG2, "%p, open file %s for read, verfyChecksum is %s", this, this->path.c_str(), (verifyChecksum ? "true" : "false"));
        conf = shared_ptr < SessionConfig > (new SessionConfig(fs->getConf()));
        this->auth = RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getRpcAuthMethod()));
        prefetchSize = conf->getDefaultBlockSize() * conf->getPrefetchSize();
        localRead = conf->isReadFromLocal();
        maxGetBlockInfoRetry = conf->getMaxGetBlockInfoRetry();
        peerCache = &fs->getPeerCache();
        updateBlockInfos();
        closed = false;
        /* If file is encrypted , then initialize CryptoCodec. */
        fileStatus = fs->getFileStatus(this->path.c_str());
        FileEncryptionInfo *fileEnInfo = fileStatus.getFileEncryption();
        if (fileStatus.isFileEncrypted()) {
            if (cryptoCodec == NULL) {
                enAuth = shared_ptr<RpcAuth> (
                        new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod())));
                kcp = shared_ptr<KmsClientProvider> (
                        new KmsClientProvider(enAuth, conf));
                cryptoCodec = shared_ptr<CryptoCodec> (
                        new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize()));

                int64_t file_length = 0;
                int ret = cryptoCodec->init(CryptoMethod::DECRYPT, file_length);
                if (ret < 0) {
                    THROW(HdfsIOException, "init CryptoCodec failed, file:%s", this->path.c_str());
                }
            }
         }
    } catch (const HdfsCanceled & e) {
        throw;
    } catch (const FileNotFoundException & e) {
        throw;
    } catch (const HdfsException & e) {
        NESTED_THROW(HdfsIOException, "InputStreamImpl: cannot open file: %s.",
                     this->path.c_str());
    }
}

int32_t InputStreamImpl::read(char * buf, int32_t size) {
    checkStatus();

    try {
        int64_t prvious = cursor;
        int32_t done = readInternal(buf, size);
        LOG(DEBUG3, "%p read file %s size is %d, offset %" PRId64 " done %d, next pos %" PRId64, this, path.c_str(), size,
            prvious, done, cursor);
        return done;
    } catch (const HdfsEndOfStream & e) {
        throw;
    } catch (...) {
        lastError = current_exception();
        throw;
    }
}

int32_t InputStreamImpl::readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure) {
    bool temporaryDisableLocalRead = false;
    std::string buffer;

    while (true) {
        try {
            /*
             * Setup block reader here and handle failure.
             */
            if (!blockReader) {
                setupBlockReader(temporaryDisableLocalRead);
                temporaryDisableLocalRead = false;
            }
        } catch (const HdfsInvalidBlockToken & e) {
            std::string buffer;
            LOG(LOG_ERROR,
                "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.",
                curBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
            return -1;
        } catch (const HdfsIOException & e) {
            /*
             * In setupBlockReader, we have tried all the replicas.
             * We now update block informations once, and try again.
             */
            if (shouldUpdateMetadataOnFailure) {
                LOG(LOG_ERROR,
                    "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.",
                    curBlock->toString().c_str(), path.c_str(),
                    GetExceptionDetail(e, buffer));
                return -1;
            } else {
                /*
                 * We have updated block informations and failed again.
                 */
                throw;
            }
        }

        /*
         * Block reader has been setup, read from block reader.
         */
        try {
            int32_t todo = size;
            todo = todo < endOfCurBlock - cursor ?
                   todo : static_cast<int32_t>(endOfCurBlock - cursor);
            assert(blockReader);
            todo = blockReader->read(buf, todo);
            cursor += todo;
            /*
             * Exit the loop and function from here if success.
             */
            return todo;
        } catch (const HdfsIOException & e) {
            /*
             * Failed to read from current block reader,
             * add the current datanode to invalid node list and try again.
             */
            LOG(LOG_ERROR,
                "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, "
                "retry read again from another Datanode.",
                curBlock->toString().c_str(), path.c_str(),
                curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));

            if (conf->doesNotRetryAnotherNode()) {
                throw;
            }
        } catch (const ChecksumException & e) {
            LOG(LOG_ERROR,
                "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, "
                "retry read again from another Datanode.",
                curBlock->toString().c_str(), path.c_str(),
                curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
        }

        /*
         * Successfully create the block reader but failed to read.
         * Disable the local block reader and try the same node again.
         */
        if (!blockReader || dynamic_cast<LocalBlockReader *>(blockReader.get())) {
            temporaryDisableLocalRead = true;
        } else {
            /*
             * Remote block reader failed to read, try another node.
             */
            LOG(INFO, "IntputStreamImpl: Add invalid datanode %s to failed datanodes and try another datanode again for file %s.",
                curNode.formatAddress().c_str(), path.c_str());
            failedNodes.push_back(curNode);
            std::sort(failedNodes.begin(), failedNodes.end());
        }

        blockReader.reset();
    }
}

/**
 * To read data from hdfs.
 * @param buf the buffer used to filled.
 * @param size buffer size.
 * @return return the number of bytes filled in the buffer, it may less than size.
 */
int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
    int updateMetadataOnFailure = conf->getMaxReadBlockRetry();

    try {
        do {
            const LocatedBlock * lb = NULL;

            /*
             * Check if we have got the block information we need.
             */
            if (!lbs || cursor >= getFileLength()
                    || (cursor >= endOfCurBlock && !(lb = lbs->findBlock(cursor)))) {
                /*
                 * Get block information from namenode.
                 * Do RPC failover work in updateBlockInfos.
                 */
                updateBlockInfos();

                /*
                 * We already have the up-to-date block information,
                 * Check if we reach the end of file.
                 */
                if (cursor >= getFileLength()) {
                    THROW(HdfsEndOfStream,
                          "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %d, from file: %s",
                          cursor, size, path.c_str());
                }
            }

            /*
             * If we reach the end of block or the block information has just updated,
             * seek to the right block to read.
             */
            if (cursor >= endOfCurBlock) {
                lb = lbs->findBlock(cursor);

                if (!lb) {
                    THROW(HdfsIOException,
                          "InputStreamImpl: cannot find block information at position: %" PRId64 " for file: %s",
                          cursor, path.c_str());
                }

                /*
                 * Seek to the right block, setup all needed variable,
                 * but do not setup block reader, setup it latter.
                 */
                seekToBlock(*lb);
            }

            int32_t retval = readOneBlock(buf, size, updateMetadataOnFailure > 0);

            /*
             * Now we have tried all replicas and failed.
             * We will update metadata once and try again.
             */
            if (retval < 0) {
                lbs.reset();
                endOfCurBlock = 0;
                --updateMetadataOnFailure;

                try {
                    sleep_for(seconds(1));
                } catch (...) {
                }

                continue;
            }
            std::string bufDecode;
            if (fileStatus.isFileEncrypted()) {
                /* Decrypt buffer if the file is encrypted. */
                bufDecode = cryptoCodec->cipher_wrap(buf, retval);
                memcpy(buf, bufDecode.c_str(), retval);
            }

            return retval;
        } while (true);
    } catch (const HdfsCanceled & e) {
        throw;
    } catch (const HdfsEndOfStream & e) {
        throw;
    } catch (const HdfsException & e) {
        /*
         * wrap the underlying error and rethrow.
         */
        NESTED_THROW(HdfsIOException,
                     "InputStreamImpl: cannot read file: %s, from position %" PRId64 ", size: %d.",
                     path.c_str(), cursor, size);
    }
}

/**
 * To read data from hdfs, block until get the given size of bytes.
 * @param buf the buffer used to filled.
 * @param size the number of bytes to be read.
 */
void InputStreamImpl::readFully(char * buf, int64_t size) {
    LOG(DEBUG3, "readFully file %s size is %" PRId64 ", offset %" PRId64, path.c_str(), size, cursor);
    checkStatus();

    try {
        return readFullyInternal(buf, size);
    } catch (const HdfsEndOfStream & e) {
        throw;
    } catch (...) {
        lastError = current_exception();
        throw;
    }
}

void InputStreamImpl::readFullyInternal(char * buf, int64_t size) {
    int32_t done;
    int64_t pos = cursor, todo = size;

    try {
        while (todo > 0) {
            done = todo < std::numeric_limits<int32_t>::max() ?
                   static_cast<int32_t>(todo) :
                   std::numeric_limits<int32_t>::max();
            done = readInternal(buf + (size - todo), done);
            todo -= done;
        }
    } catch (const HdfsCanceled & e) {
        throw;
    } catch (const HdfsEndOfStream & e) {
        THROW(HdfsEndOfStream,
              "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %" PRId64 ", from file: %s",
              pos, size, path.c_str());
    } catch (const HdfsException & e) {
        NESTED_THROW(HdfsIOException,
                     "InputStreamImpl: cannot read fully from file: %s, from position %" PRId64 ", size: %" PRId64 ".",
                     path.c_str(), pos, size);
    }
}

int64_t InputStreamImpl::available() {
    checkStatus();

    try {
        if (blockReader) {
            return blockReader->available();
        }
    } catch (...) {
        lastError = current_exception();
        throw;
    }

    return 0;
}

/**
 * To move the file point to the given position.
 * @param size the given position.
 */
void InputStreamImpl::seek(int64_t pos) {
    LOG(DEBUG2, "%p seek file %s to %" PRId64 ", offset %" PRId64, this, path.c_str(), pos, cursor);
    checkStatus();

    try {
        seekInternal(pos);
    } catch (...) {
        lastError = current_exception();
        throw;
    }
}

void InputStreamImpl::seekInternal(int64_t pos) {
    if (cursor == pos) {
        return;
    }

    if (!lbs || pos > getFileLength()) {
        updateBlockInfos();

        if (pos > getFileLength()) {
            THROW(HdfsEndOfStream,
                  "InputStreamImpl: seek over EOF, current position: %" PRId64 ", seek target: %" PRId64 ", in file: %s",
                  cursor, pos, path.c_str());
        }
    }

    try {
        if (blockReader && pos > cursor && pos < endOfCurBlock && (pos - cursor) < blockReader->available()) {
            blockReader->skip(pos - cursor);
            cursor = pos;
            if (cryptoCodec) {
                int ret = cryptoCodec->resetStreamOffset(CryptoMethod::DECRYPT,
                        cursor);
                if (ret < 0) {
                    THROW(HdfsIOException, "Reset offset failed, file:%s",
                            this->path.c_str());
                }
            }
            return;
        }
    } catch (const HdfsIOException & e) {
        std::string buffer;
        LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s",
            pos - cursor, path.c_str(), GetExceptionDetail(e, buffer));
        LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str());
    } catch (const ChecksumException & e) {
        std::string buffer;
        LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s",
            pos - cursor, path.c_str(), GetExceptionDetail(e, buffer));
        LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str());
    }

    /**
     * the seek target exceed the current block or skip failed in current block reader.
     * reset current block reader and set the cursor to the target position to seek.
     */
    endOfCurBlock = 0;
    blockReader.reset();
    cursor = pos;
    if (cryptoCodec) {
        int ret = cryptoCodec->resetStreamOffset(CryptoMethod::DECRYPT, cursor);
        if (ret < 0) {
            THROW(HdfsIOException, "init CryptoCodec failed, file:%s", this->path.c_str());
        }
    }
}

/**
 * To get the current file point position.
 * @return the position of current file point.
 */
int64_t InputStreamImpl::tell() {
    checkStatus();
    LOG(DEBUG2, "tell file %s at %" PRId64, path.c_str(), cursor);
    return cursor;
}

/**
 * Close the stream.
 */
void InputStreamImpl::close() {
    LOG(DEBUG2, "%p close file %s for read", this, path.c_str());
    closed = true;
    localRead = true;
    readFromUnderConstructedBlock = false;
    verify = true;
    filesystem.reset();
    cursor = 0;
    endOfCurBlock = 0;
    lastBlockBeingWrittenLength = 0;
    prefetchSize = 0;
    blockReader.reset();
    curBlock.reset();
    lbs.reset();
    conf.reset();
    failedNodes.clear();
    path.clear();
    localReaderBuffer.resize(0);
    lastError = exception_ptr();
}

std::string InputStreamImpl::toString() {
    if (path.empty()) {
        return std::string("InputStream for path ") + path;
    } else {
        return std::string("InputStream (not opened)");
    }
}

}
}
