/********************************************************************
 * 2014 -
 * open source under Apache License Version 2.0
 ********************************************************************/
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "DateTime.h"
#include "Pipeline.h"
#include "Logger.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "OutputStreamInter.h"
#include "FileSystemInter.h"
#include "DataTransferProtocolSender.h"
#include "datatransfer.pb.h"

#include <inttypes.h>

namespace Hdfs {
namespace Internal {

PipelineImpl::PipelineImpl(bool append, const char * path, const SessionConfig & conf,
                           shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize,
                           int replication, int64_t bytesSent, PacketPool & packetPool, shared_ptr<LocatedBlock> lastBlock) :
    checksumType(checksumType), chunkSize(chunkSize), errorIndex(-1), replication(replication), bytesAcked(
        bytesSent), bytesSent(bytesSent), packetPool(packetPool), filesystem(filesystem), lastBlock(lastBlock), path(
            path) {
    canAddDatanode = conf.canAddDatanode();
    blockWriteRetry = conf.getBlockWriteRetry();
    connectTimeout = conf.getOutputConnTimeout();
    readTimeout = conf.getOutputReadTimeout();
    writeTimeout = conf.getOutputWriteTimeout();
    clientName = filesystem->getClientName();

    if (append) {
        LOG(DEBUG2, "create pipeline for file %s to append to %s at position %" PRId64,
            path, lastBlock->toString().c_str(), lastBlock->getNumBytes());
        stage = PIPELINE_SETUP_APPEND;
        assert(lastBlock);
        nodes = lastBlock->getLocations();
        storageIDs = lastBlock->getStorageIDs();
        buildForAppendOrRecovery(false);
        stage = DATA_STREAMING;
    } else {
        LOG(DEBUG2, "create pipeline for file %s to write to a new block", path);
        stage = PIPELINE_SETUP_CREATE;
        buildForNewBlock();
        stage = DATA_STREAMING;
    }
}

int PipelineImpl::findNewDatanode(const std::vector<DatanodeInfo> & original) {
    if (nodes.size() != original.size() + 1) {
        THROW(HdfsIOException, "Failed to acquire a datanode for block %s from namenode.",
              lastBlock->toString().c_str());
    }

    for (size_t i = 0; i < nodes.size(); i++) {
        size_t j = 0;

        for (; j < original.size() && !(nodes[i] == original[j]); j++)
            ;

        if (j == original.size()) {
            return i;
        }
    }

    THROW(HdfsIOException, "Cannot add new datanode for block %s.", lastBlock->toString().c_str());
}

void PipelineImpl::transfer(const ExtendedBlock & blk, const DatanodeInfo & src,
                            const std::vector<DatanodeInfo> & targets, const Token & token) {
    shared_ptr<Socket> so(new TcpSocketImpl);
    shared_ptr<BufferedSocketReader> in(new BufferedSocketReaderImpl(*so));
    so->connect(src.getIpAddr().c_str(), src.getXferPort(), connectTimeout);
    DataTransferProtocolSender sender(*so, writeTimeout, src.formatAddress());
    sender.transferBlock(blk, token, clientName.c_str(), targets);
    int size;
    size = in->readVarint32(readTimeout);
    std::vector<char> buf(size);
    in->readFully(&buf[0], size, readTimeout);
    BlockOpResponseProto resp;

    if (!resp.ParseFromArray(&buf[0], size)) {
        THROW(HdfsIOException, "cannot parse datanode response from %s fro block %s.",
              src.formatAddress().c_str(), lastBlock->toString().c_str());
    }

    if (Status::DT_PROTO_SUCCESS != resp.status()) {
        THROW(HdfsIOException, "Failed to transfer block to a new datanode %s for block %s.",
              targets[0].formatAddress().c_str(),
              lastBlock->toString().c_str());
    }
}

bool PipelineImpl::addDatanodeToPipeline(const std::vector<DatanodeInfo> & excludedNodes) {
    try {
        /*
         * get a new datanode
         */
        std::vector<DatanodeInfo> original = nodes;
        shared_ptr<LocatedBlock> lb;
        lb = filesystem->getAdditionalDatanode(path, *lastBlock, nodes, storageIDs,
                                               excludedNodes, 1);
        nodes = lb->getLocations();
        storageIDs = lb->getStorageIDs();

        /*
         * failed to add new datanode into pipeline.
         */
        if (original.size() == nodes.size()) {
            LOG(LOG_ERROR,
                "Failed to add new datanode into pipeline for block: %s file %s.",
                lastBlock->toString().c_str(), path.c_str());
        } else {
            /*
             * find the new datanode
             */
            int d = findNewDatanode(original);
            /*
             * in case transfer block fail.
             */
            errorIndex = d;
            /*
             * transfer replica
             */
            DatanodeInfo & src = d == 0 ? nodes[1] : nodes[d - 1];
            std::vector<DatanodeInfo> targets;
            targets.push_back(nodes[d]);
            LOG(INFO, "Replicate block %s from %s to %s for file %s.", lastBlock->toString().c_str(),
                src.formatAddress().c_str(), targets[0].formatAddress().c_str(), path.c_str());
            transfer(*lastBlock, src, targets, lb->getToken());
            errorIndex = -1;
            return true;
        }
    } catch (const HdfsCanceled & e) {
        throw;
    } catch (const HdfsFileSystemClosed & e) {
        throw;
    } catch (const SafeModeException & e) {
        throw;
    } catch (const HdfsException & e) {
        std::string buffer;
        LOG(LOG_ERROR,
            "Failed to add a new datanode into pipeline for block: %s file %s.\n%s",
            lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
    }

    return false;
}

void PipelineImpl::checkPipelineWithReplicas() {
    if (static_cast<int>(nodes.size()) < replication) {
        std::stringstream ss;
        ss.imbue(std::locale::classic());
        int size = nodes.size();

        for (int i = 0; i < size - 1; ++i) {
            ss << nodes[i].formatAddress() << ", ";
        }

        if (nodes.empty()) {
            ss << "Empty";
        } else {
            ss << nodes.back().formatAddress();
        }

        LOG(WARNING,
            "the number of nodes in pipeline is %d [%s], is less than the expected number of replica %d for block %s file %s",
            static_cast<int>(nodes.size()), ss.str().c_str(), replication,
            lastBlock->toString().c_str(), path.c_str());
    }
}

void PipelineImpl::buildForAppendOrRecovery(bool recovery) {
    int64_t gs = 0;
    int retry = blockWriteRetry;
    exception_ptr lastException;
    std::vector<DatanodeInfo> excludedNodes;
    shared_ptr<LocatedBlock> lb;
    std::string buffer;

    do {
        /*
         * Remove bad datanode from list of datanodes.
         * If errorIndex was not set (i.e. appends), then do not remove
         * any datanodes
         */
        if (errorIndex >= 0) {
            assert(lastBlock);
            LOG(LOG_ERROR, "Pipeline: node %s is invalid and removed from pipeline when %s block %s for file %s, stage = %s.",
                nodes[errorIndex].formatAddress().c_str(),
                (recovery ? "recovery" : "append to"), lastBlock->toString().c_str(),
                path.c_str(), StageToString(stage));
            excludedNodes.push_back(nodes[errorIndex]);
            nodes.erase(nodes.begin() + errorIndex);

            if (!storageIDs.empty()) {
                storageIDs.erase(storageIDs.begin() + errorIndex);
            }

            if (nodes.empty()) {
                THROW(HdfsIOException,
                      "Build pipeline to %s block %s failed: all datanodes are bad.",
                      (recovery ? "recovery" : "append to"), lastBlock->toString().c_str());
            }

            errorIndex = -1;
        }

        try {
            gs = 0;

            /*
             * Check if the number of datanodes in pipeline satisfy the replication requirement,
             * add new datanode if not
             */
            if (stage != PIPELINE_SETUP_CREATE && stage != PIPELINE_CLOSE
                    && static_cast<int>(nodes.size()) < replication && canAddDatanode) {
                if (!addDatanodeToPipeline(excludedNodes)) {
                    THROW(HdfsIOException,
                          "Failed to add new datanode into pipeline for block: %s file %s, "
                          "set \"output.replace-datanode-on-failure\" to \"false\" to disable this feature.",
                          lastBlock->toString().c_str(), path.c_str());
                }
            }

            if (errorIndex >= 0) {
                continue;
            }

            checkPipelineWithReplicas();
            /*
             * Update generation stamp and access token
             */
            lb = filesystem->updateBlockForPipeline(*lastBlock);
            gs = lb->getGenerationStamp();
            /*
             * Try to build pipeline
             */
            createBlockOutputStream(lb->getToken(), gs, recovery);
            /*
             * everything is ok, reset errorIndex.
             */
            errorIndex = -1;
            lastException = exception_ptr();
            break; //break on success
        } catch (const HdfsInvalidBlockToken & e) {
            lastException = current_exception();
            recovery = true;
            LOG(LOG_ERROR,
                "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
                lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
            LOG(INFO, "Try to recovery pipeline for block %s file %s.",
                lastBlock->toString().c_str(), path.c_str());
        } catch (const HdfsTimeoutException & e) {
            lastException = current_exception();
            recovery = true;
            LOG(LOG_ERROR,
                "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
                lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
            LOG(INFO, "Try to recovery pipeline for block %s file %s.",
                lastBlock->toString().c_str(), path.c_str());
        } catch (const HdfsIOException & e) {
            lastException = current_exception();
            /*
             * Set recovery flag to true in case of failed to create a pipeline for appending.
             */
            recovery = true;
            LOG(LOG_ERROR,
                "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
                lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
            LOG(INFO, "Try to recovery pipeline for block %s file %s.", lastBlock->toString().c_str(), path.c_str());
        }

        /*
         * we don't known what happened, no datanode is reported failure, reduce retry count in case infinite loop.
         * it may caused by rpc call throw HdfsIOException
         */
        if (errorIndex < 0) {
            --retry;
        }
    } while (retry > 0);

    if (lastException) {
        rethrow_exception(lastException);
    }

    /*
     * Update pipeline at the namenode, non-idempotent RPC call.
     */
    lb->setPoolId(lastBlock->getPoolId());
    lb->setBlockId(lastBlock->getBlockId());
    lb->setLocations(nodes);
    lb->setStorageIDs(storageIDs);
    lb->setNumBytes(lastBlock->getNumBytes());
    lb->setOffset(lastBlock->getOffset());
    filesystem->updatePipeline(*lastBlock, *lb, nodes, storageIDs);
    lastBlock = lb;
}

void PipelineImpl::locateNextBlock(
    const std::vector<DatanodeInfo> & excludedNodes) {
    milliseconds sleeptime(100);
    milliseconds fiveSeconds(5000);
    int retry = blockWriteRetry;

    while (true) {
        try {
            lastBlock = filesystem->addBlock(path, lastBlock.get(),
                                             excludedNodes);
            assert(lastBlock);
            return;
        } catch (const NotReplicatedYetException & e) {
            LOG(DEBUG1, "Got NotReplicatedYetException when try to addBlock for block %s, "
                "already retry %d times, max retry %d times", lastBlock->toString().c_str(),
                blockWriteRetry - retry, blockWriteRetry);

            if (retry--) {
                try {
                    sleep_for(sleeptime);
                } catch (...) {
                }

                sleeptime *= 2;
                sleeptime = sleeptime < fiveSeconds ? sleeptime : fiveSeconds;
            } else {
                throw;
            }
        }
    }
}

static std::string FormatExcludedNodes(
    const std::vector<DatanodeInfo> & excludedNodes) {
    std::stringstream ss;
    ss.imbue(std::locale::classic());
    ss << "[";
    int size = excludedNodes.size();

    for (int i = 0; i < size - 1; ++i) {
        ss << excludedNodes[i].formatAddress() << ", ";
    }

    if (excludedNodes.empty()) {
        ss << "Empty";
    } else {
        ss << excludedNodes.back().formatAddress();
    }

    ss << "]";
    return ss.str();
}

void PipelineImpl::buildForNewBlock() {
    int retryAllocNewBlock = 0, retry = blockWriteRetry;
    LocatedBlock lb;
    std::vector<DatanodeInfo> excludedNodes;
    shared_ptr<LocatedBlock> block = lastBlock;
    std::string buffer;

    do {
        errorIndex = -1;
        lastBlock = block;

        try {
            locateNextBlock(excludedNodes);
            lastBlock->setNumBytes(0);
            nodes = lastBlock->getLocations();
            storageIDs = lastBlock->getStorageIDs();
        } catch (const HdfsRpcException & e) {
            const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
            LOG(LOG_ERROR,
                "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s",
                path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer));

            if (retryAllocNewBlock > blockWriteRetry) {
                throw;
            }

            LOG(INFO, "Retry to allocate a new empty block for file %s, last block %s, excluded nodes %s.",
                path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str());
            ++retryAllocNewBlock;
            continue;
        } catch (const HdfsException & e) {
            const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
            LOG(LOG_ERROR,
                "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s",
                path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer));
            throw;
        }

        retryAllocNewBlock = 0;
        checkPipelineWithReplicas();

        if (nodes.empty()) {
            THROW(HdfsIOException,
                  "No datanode is available to create a pipeline for block %s file %s.",
                  lastBlock->toString().c_str(), path.c_str());
        }

        try {
            createBlockOutputStream(lastBlock->getToken(), 0, false);
            break;  //break on success
        } catch (const HdfsInvalidBlockToken & e) {
            LOG(LOG_ERROR,
                "Failed to setup the pipeline for new block %s file %s.\n%s",
                lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
        } catch (const HdfsTimeoutException & e) {
            LOG(LOG_ERROR,
                "Failed to setup the pipeline for new block %s file %s.\n%s",
                lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
        } catch (const HdfsIOException & e) {
            LOG(LOG_ERROR,
                "Failed to setup the pipeline for new block %s file %s.\n%s",
                lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
        }

        LOG(INFO, "Abandoning block: %s for file %s.", lastBlock->toString().c_str(), path.c_str());

        try {
            filesystem->abandonBlock(*lastBlock, path);
        } catch (const HdfsException & e) {
            LOG(LOG_ERROR,
                "Failed to abandon useless block %s for file %s.\n%s",
                lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
            throw;
        }

        if (errorIndex >= 0) {
            LOG(INFO, "Excluding invalid datanode: %s for block %s for file %s",
                nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(), path.c_str());
            excludedNodes.push_back(nodes[errorIndex]);
        } else {
            /*
             * we don't known what happened, no datanode is reported failure, reduce retry count in case of infinite loop.
             */
            --retry;
        }
    } while (retry);
}

/*
 * bad link node must be either empty or a "IP:PORT"
 */
void PipelineImpl::checkBadLinkFormat(const std::string & n) {
    std::string node = n;

    if (node.empty()) {
        return;
    }

    do {
        const char * host = &node[0], *port;
        size_t pos = node.find_last_of(":");

        if (pos == node.npos || pos + 1 == node.length()) {
            break;
        }

        node[pos] = 0;
        port = &node[pos + 1];
        struct addrinfo hints, *addrs;
        memset(&hints, 0, sizeof(hints));
        hints.ai_family = PF_UNSPEC;
        hints.ai_socktype = SOCK_STREAM;
        hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
        int p;
        char * end;
        p = strtol(port, &end, 0);

        if (p >= 65536 || p <= 0 || end != port + strlen(port)) {
            break;
        }

        if (getaddrinfo(host, port, &hints, &addrs)) {
            break;
        }

        freeaddrinfo(addrs);
        return;
    } while (0);

    LOG(FATAL, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.",
        n.c_str());
    THROW(HdfsException, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.",
          n.c_str());
}

void PipelineImpl::createBlockOutputStream(const Token & token, int64_t gs, bool recovery) {
    std::string firstBadLink;
    exception_ptr lastError;
    bool needWrapException = true;

    try {
        sock = shared_ptr < Socket > (new TcpSocketImpl);
        reader = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock));
        sock->connect(nodes[0].getIpAddr().c_str(), nodes[0].getXferPort(),
                      connectTimeout);
        std::vector<DatanodeInfo> targets;

        for (size_t i = 1; i < nodes.size(); ++i) {
            targets.push_back(nodes[i]);
        }

        DataTransferProtocolSender sender(*sock, writeTimeout,
                                          nodes[0].formatAddress());
        sender.writeBlock(*lastBlock, token, clientName.c_str(), targets,
                          (recovery ? (stage | 0x1) : stage), nodes.size(),
                          lastBlock->getNumBytes(), bytesSent, gs, checksumType, chunkSize);
        int size;
        size = reader->readVarint32(readTimeout);
        std::vector<char> buf(size);
        reader->readFully(&buf[0], size, readTimeout);
        BlockOpResponseProto resp;

        if (!resp.ParseFromArray(&buf[0], size)) {
            THROW(HdfsIOException, "cannot parse datanode response from %s for block %s.",
                  nodes[0].formatAddress().c_str(), lastBlock->toString().c_str());
        }

        Status pipelineStatus = resp.status();
        firstBadLink = resp.firstbadlink();

        if (Status::DT_PROTO_SUCCESS != pipelineStatus) {
            needWrapException = false;

            if (Status::DT_PROTO_ERROR_ACCESS_TOKEN == pipelineStatus) {
                THROW(HdfsInvalidBlockToken,
                      "Got access token error for connect ack with firstBadLink as %s for block %s",
                      firstBadLink.c_str(), lastBlock->toString().c_str());
            } else {
                THROW(HdfsIOException, "Bad connect ack with firstBadLink as %s for block %s",
                      firstBadLink.c_str(), lastBlock->toString().c_str());
            }
        }

        return;
    } catch (...) {
        errorIndex = 0;
        lastError = current_exception();
    }

    checkBadLinkFormat(firstBadLink);

    if (!firstBadLink.empty()) {
        for (size_t i = 0; i < nodes.size(); ++i) {
            if (nodes[i].getXferAddr() == firstBadLink) {
                errorIndex = i;
                break;
            }
        }
    }

    assert(lastError);

    if (!needWrapException) {
        rethrow_exception(lastError);
    }

    try {
        rethrow_exception(lastError);
    } catch (const HdfsException & e) {
        NESTED_THROW(HdfsIOException,
                     "Cannot create block output stream for block %s, "
                     "recovery flag: %s, with last generate stamp %" PRId64 ".",
                     lastBlock->toString().c_str(), (recovery ? "true" : "false"), gs);
    }
}

void PipelineImpl::resend() {
    assert(stage != PIPELINE_CLOSE);

    for (size_t i = 0; i < packets.size(); ++i) {
        ConstPacketBuffer b = packets[i]->getBuffer();
        sock->writeFully(b.getBuffer(), b.getSize(), writeTimeout);
        int64_t tmp = packets[i]->getLastByteOffsetBlock();
        bytesSent = bytesSent > tmp ? bytesSent : tmp;
    }
}

void PipelineImpl::send(shared_ptr<Packet> packet) {
    ConstPacketBuffer buffer = packet->getBuffer();

    if (!packet->isHeartbeat()) {
        packets.push_back(packet);
    }

    /*
     * too many packets pending on the ack. wait in case of consuming to much memory.
     */
    if (static_cast<int>(packets.size()) > packetPool.getMaxSize()) {
        waitForAcks(false);
    }

    bool failover = false;

    do {
        try {
            if (failover) {
                resend();
            } else {
                assert(sock);
                sock->writeFully(buffer.getBuffer(), buffer.getSize(),
                                 writeTimeout);
                int64_t tmp = packet->getLastByteOffsetBlock();
                bytesSent = bytesSent > tmp ? bytesSent : tmp;
            }

            checkResponse(false);
            return;
        } catch (const HdfsIOException & e) {
            if (errorIndex < 0) {
                errorIndex = 0;
            }

            sock.reset();
        }

        buildForAppendOrRecovery(true);
        failover = true;

        if (stage == PIPELINE_CLOSE) {
            assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
            packets.clear();
            break;
        }
    } while (true);
}

void PipelineImpl::processAck(PipelineAck & ack) {
    assert(!ack.isInvalid());
    int64_t seqno = ack.getSeqno();

    if (HEART_BEAT_SEQNO == seqno) {
        return;
    }

    assert(!packets.empty());
    Packet & packet = *packets[0];

    if (ack.isSuccess()) {
        if (packet.getSeqno() != seqno) {
            THROW(HdfsIOException,
                  "processAck: pipeline ack expecting seqno %" PRId64 "  but received %" PRId64 " for block %s.",
                  packet.getSeqno(), seqno, lastBlock->toString().c_str());
        }

        int64_t tmp = packet.getLastByteOffsetBlock();
        bytesAcked = tmp > bytesAcked ? tmp : bytesAcked;
        assert(lastBlock);
        lastBlock->setNumBytes(bytesAcked);

        if (packet.isLastPacketInBlock()) {
            sock.reset();
        }

        packetPool.relesePacket(packets[0]);
        packets.pop_front();
    } else {
        for (int i = ack.getNumOfReplies() - 1; i >= 0; --i) {
            if (Status::DT_PROTO_SUCCESS != ack.getReply(i)) {
                errorIndex = i;
                /*
                 * handle block token expire as same as HdfsIOException.
                 */
                THROW(HdfsIOException,
                      "processAck: ack report error at node: %s for block %s.",
                      nodes[i].formatAddress().c_str(), lastBlock->toString().c_str());
            }
        }
    }
}

void PipelineImpl::processResponse() {
    PipelineAck ack;
    std::vector<char> buf;
    int size = reader->readVarint32(readTimeout);
    ack.reset();
    buf.resize(size);
    reader->readFully(&buf[0], size, readTimeout);
    ack.readFrom(&buf[0], size);

    if (ack.isInvalid()) {
        THROW(HdfsIOException,
              "processAllAcks: get an invalid DataStreamer packet ack for block %s",
              lastBlock->toString().c_str());
    }

    processAck(ack);
}

void PipelineImpl::checkResponse(bool wait) {
    int timeout = wait ? readTimeout : 0;
    bool readable = reader->poll(timeout);

    if (readable) {
        processResponse();
    } else if (wait) {
        THROW(HdfsIOException, "Timeout when reading response for block %s, datanode %s do not response.",
              lastBlock->toString().c_str(),
              nodes[0].formatAddress().c_str());
    }
}

void PipelineImpl::flush() {
    waitForAcks(true);
}

void PipelineImpl::waitForAcks(bool force) {
    bool failover = false;

    while (!packets.empty()) {
        /*
         * just wait for some acks in case of consuming too much memory.
         */
        if (!force && static_cast<int>(packets.size()) < packetPool.getMaxSize()) {
            return;
        }

        try {
            if (failover) {
                resend();
            }

            checkResponse(true);
            failover = false;
        } catch (const HdfsIOException & e) {
            if (errorIndex < 0) {
                errorIndex = 0;
            }

            std::string buffer;
            LOG(LOG_ERROR,
                "Failed to flush pipeline on datanode %s for block %s file %s.\n%s",
                nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(),
                path.c_str(), GetExceptionDetail(e, buffer));
            LOG(INFO, "Rebuild pipeline to flush for block %s file %s.", lastBlock->toString().c_str(), path.c_str());
            sock.reset();
            failover = true;
        }

        if (failover) {
            buildForAppendOrRecovery(true);

            if (stage == PIPELINE_CLOSE) {
                assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
                packets.clear();
                break;
            }
        }
    }
}

shared_ptr<LocatedBlock> PipelineImpl::close(shared_ptr<Packet> lastPacket) {
    waitForAcks(true);
    lastPacket->setLastPacketInBlock(true);
    stage = PIPELINE_CLOSE;
    send(lastPacket);
    waitForAcks(true);
    sock.reset();
    lastBlock->setNumBytes(bytesAcked);
    LOG(DEBUG2, "close pipeline for file %s, block %s with length %" PRId64,
        path.c_str(), lastBlock->toString().c_str(),
        lastBlock->getNumBytes());
    return lastBlock;
}

}
}
