/********************************************************************
 * 2014 -
 * open source under Apache License Version 2.0
 ********************************************************************/
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "BigEndian.h"
#include "datatransfer.pb.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "HWCrc32c.h"
#include "LocalBlockReader.h"
#include "SWCrc32c.h"

#include <inttypes.h>
#include <limits>

#define BMVERSION 1
#define BMVERSION_SIZE 2

#define HEADER_SIZE (BMVERSION_SIZE + CHECKSUM_TYPE_SIZE + CHECKSUM_BYTES_PER_CHECKSUM_SIZE)

namespace Hdfs {
namespace Internal {

LocalBlockReader::LocalBlockReader(const shared_ptr<ReadShortCircuitInfo>& info,
                                   const ExtendedBlock& block, int64_t offset,
                                   bool verify, SessionConfig& conf,
                                   std::vector<char>& buffer)
    : verify(verify),
      pbuffer(NULL),
      pMetaBuffer(NULL),
      block(block),
      checksumSize(0),
      chunkSize(0),
      position(0),
      size(0),
      cursor(0),
      length(block.getNumBytes()),
      info(info),
      buffer(buffer) {
    try {
        metaFd = info->getMetaFile();
        dataFd = info->getDataFile();

        std::vector<char> header;
        pMetaBuffer = metaFd->read(header, HEADER_SIZE);
        int16_t version = ReadBigEndian16FromArray(&pMetaBuffer[0]);

        if (BMVERSION != version) {
            THROW(HdfsIOException,
                  "LocalBlockReader get an unmatched block, expected block version %d, real version is %d",
                  BMVERSION, static_cast<int>(version));
        }

        switch (pMetaBuffer[BMVERSION_SIZE]) {
        case ChecksumTypeProto::CHECKSUM_NULL:
            this->verify = false;
            checksumSize = 0;
            metaFd.reset();
            break;

        case ChecksumTypeProto::CHECKSUM_CRC32:
            THROW(HdfsIOException,
                  "LocalBlockReader does not support CRC32 checksum.");
            break;

        case ChecksumTypeProto::CHECKSUM_CRC32C:
            if (HWCrc32c::available()) {
                checksum = shared_ptr<Checksum>(new HWCrc32c());
            } else {
                checksum = shared_ptr<Checksum>(new SWCrc32c());
            }

            chunkSize = ReadBigEndian32FromArray(
                            &pMetaBuffer[BMVERSION_SIZE + CHECKSUM_TYPE_SIZE]);
            checksumSize = sizeof(int32_t);
            break;

        default:
            THROW(HdfsIOException,
                  "LocalBlockReader cannot recognize checksum type: %d.",
                  static_cast<int>(pMetaBuffer[BMVERSION_SIZE]));
        }

        if (verify && chunkSize <= 0) {
            THROW(HdfsIOException,
                  "LocalBlockReader get an invalid checksum parameter, bytes per check: %d.",
                  chunkSize);
        }

        localBufferSize = conf.getLocalReadBufferSize();

        if (verify) {
            localBufferSize = (localBufferSize + chunkSize - 1) / chunkSize * chunkSize;
        }

        if (offset > 0) {
            skip(offset);
        }
    } catch (const HdfsCanceled & e) {
        throw;
    } catch (const HdfsException & e) {
        NESTED_THROW(HdfsIOException,
                     "Failed to construct LocalBlockReader for block: %s.",
                     block.toString().c_str());
    }
}

LocalBlockReader::~LocalBlockReader() {
}

void LocalBlockReader::readAndVerify(int32_t bufferSize) {
    assert(true == verify);
    assert(cursor % chunkSize == 0);
    int chunks = (bufferSize + chunkSize - 1) / chunkSize;
    pbuffer = dataFd->read(buffer, bufferSize);
    pMetaBuffer = metaFd->read(metaBuffer, chunks * checksumSize);

    for (int i = 0; i < chunks; ++i) {
        checksum->reset();
        int chunk = chunkSize;

        if (chunkSize * (i + 1) > bufferSize) {
            chunk = bufferSize % chunkSize;
        }

        checksum->update(&pbuffer[i * chunkSize], chunk);
        uint32_t target = ReadBigEndian32FromArray(
                              &pMetaBuffer[i * checksumSize]);

        if (target != checksum->getValue()) {
            THROW(ChecksumException,
                  "LocalBlockReader checksum not match for block: %s",
                  block.toString().c_str());
        }
    }
}

int32_t LocalBlockReader::readInternal(char * buf, int32_t len) {
    int32_t todo = len;

    /*
     * read from buffer.
     */
    if (position < size) {
        todo = todo < size - position ? todo : size - position;
        memcpy(buf, &pbuffer[position], todo);
        position += todo;
        cursor += todo;
        return todo;
    }

    /*
     * end of block
     */
    todo = todo < length - cursor ? todo : length - cursor;

    if (0 == todo) {
        return 0;
    }

    /*
     * bypass the buffer
     */
    if (!verify
            && (todo > localBufferSize || todo == length - cursor)) {
        dataFd->copy(buf, todo);
        cursor += todo;
        return todo;
    }

    /*
     * fill buffer.
     */
    int bufferSize = localBufferSize;
    bufferSize = bufferSize < length - cursor ? bufferSize : length - cursor;
    assert(bufferSize > 0);

    if (verify) {
        readAndVerify(bufferSize);
    } else {
        pbuffer = dataFd->read(buffer, bufferSize);
    }

    position = 0;
    size = bufferSize;
    assert(position < size);
    return readInternal(buf, todo);
}

int32_t LocalBlockReader::read(char * buf, int32_t size) {
    try {
        return readInternal(buf, size);
    } catch (const HdfsCanceled & e) {
        throw;
    } catch (const HdfsException & e) {
        info->setValid(false);
        NESTED_THROW(HdfsIOException,
                     "LocalBlockReader failed to read from position: %" PRId64 ", length: %d, block: %s.",
                     cursor, size, block.toString().c_str());
    }

    assert(!"cannot reach here");
    return 0;
}

void LocalBlockReader::skip(int64_t len) {
    assert(len < length - cursor);

    try {
        int64_t todo = len;

        while (todo > 0) {
            /*
             * skip the data in buffer.
             */
            if (size - position > 0) {
                int batch = todo < size - position ? todo : size - position;
                position += batch;
                todo -= batch;
                cursor += batch;
                continue;
            }

            if (verify) {
                int64_t lastChunkSize = (cursor + todo) % chunkSize;
                cursor = (cursor + todo) / chunkSize * chunkSize;
                int64_t metaCursor = HEADER_SIZE
                                     + checksumSize * (cursor / chunkSize);
                metaFd->seek(metaCursor);
                todo = lastChunkSize;
            } else {
                cursor += todo;
                todo = 0;
            }

            if (cursor > 0) {
                dataFd->seek(cursor);
            }

            /*
             * fill buffer again and verify checksum
             */
            if (todo > 0) {
                assert(true == verify);
                int bufferSize = localBufferSize;
                bufferSize =
                    bufferSize < length - cursor ?
                    bufferSize : length - cursor;
                readAndVerify(bufferSize);
                position = 0;
                size = bufferSize;
            }
        }
    } catch (const HdfsCanceled & e) {
        throw;
    } catch (const HdfsException & e) {
        info->setValid(false);
        NESTED_THROW(HdfsIOException,
                     "LocalBlockReader failed to skip from position: %" PRId64 ", length: %d, block: %s.",
                     cursor, size, block.toString().c_str());
    }
}

}
}
