/********************************************************************
 * 2014 -
 * open source under Apache License Version 2.0
 ********************************************************************/
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "Exception.h"
#include "ExceptionInternal.h"
#include "Function.h"
#include "SessionConfig.h"

#include <sstream>

#define ARRAYSIZE(A) (sizeof(A) / sizeof(A[0]))

namespace Hdfs {
namespace Internal {

template<typename T>
static void CheckRangeGE(const char * key, T const & value, T const & target) {
    if (!(value >= target)) {
        std::stringstream ss;
        ss.imbue(std::locale::classic());
        ss << "Invalid configure item: \"" << key << "\", value: " << value
           << ", expected value should be larger than " << target;
        THROW(HdfsConfigInvalid, "%s", ss.str().c_str());
    }
}

template<typename T>
static void CheckMultipleOf(const char * key, const T & value, int unit) {
    if (value <= 0 || value % unit != 0) {
        THROW(HdfsConfigInvalid, "%s should be larger than 0 and be the multiple of %d.", key, unit);
    }
}

SessionConfig::SessionConfig(const Config & conf) {
    ConfigDefault<bool> boolValues [] = {
        {
            &rpcTcpNoDelay, "rpc.client.connect.tcpnodelay", true
        }, {
            &readFromLocal, "dfs.client.read.shortcircuit", true
        }, {
            &addDatanode, "output.replace-datanode-on-failure", true
        }, {
            &notRetryAnotherNode, "input.notretry-another-node", false
        }, {
            &useMappedFile, "input.localread.mappedfile", false
        }, {
            &legacyLocalBlockReader, "dfs.client.use.legacy.blockreader.local", false
        }
    };
    ConfigDefault<int32_t> i32Values[] = {
        {
            &rpcMaxIdleTime, "rpc.client.max.idle", 10 * 1000, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &rpcPingTimeout, "rpc.client.ping.interval", 10 * 1000
        }, {
            &rpcConnectTimeout, "rpc.client.connect.timeout", 600 * 1000
        }, {
            &rpcReadTimeout, "rpc.client.read.timeout", 3600 * 1000
        }, {
            &rpcWriteTimeout, "rpc.client.write.timeout", 3600 * 1000
        }, {
            &rpcSocketLingerTimeout, "rpc.client.socekt.linger.timeout", -1
        }, {
            &rpcMaxRetryOnConnect, "rpc.client.connect.retry", 10, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &rpcTimeout, "rpc.client.timeout", 3600 * 1000
        }, {
            &defaultReplica, "dfs.default.replica", 3, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &inputConnTimeout, "input.connect.timeout", 600 * 1000
        }, {
            &inputReadTimeout, "input.read.timeout", 3600 * 1000
        }, {
            &inputWriteTimeout, "input.write.timeout", 3600 * 1000
        }, {
            &localReadBufferSize, "input.localread.default.buffersize", 1 * 1024 * 1024, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &prefetchSize, "dfs.prefetchsize", 10, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &maxGetBlockInfoRetry, "input.read.getblockinfo.retry", 3, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &maxLocalBlockInfoCacheSize, "input.localread.blockinfo.cachesize", 1000, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &maxReadBlockRetry, "input.read.max.retry", 60, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &chunkSize, "output.default.chunksize", 512, bind(CheckMultipleOf<int32_t>, _1, _2, 512)
        }, {
            &packetSize, "output.default.packetsize", 64 * 1024
        }, {
            &blockWriteRetry, "output.default.write.retry", 10, bind(CheckRangeGE<int32_t>, _1, _2, 1)
        }, {
            &outputConnTimeout, "output.connect.timeout", 600 * 1000
        }, {
            &outputReadTimeout, "output.read.timeout", 3600 * 1000
        }, {
            &outputWriteTimeout, "output.write.timeout", 3600 * 1000
        }, {
            &closeFileTimeout, "output.close.timeout", 3600 * 1000
        }, {
            &packetPoolSize, "output.packetpool.size", 1024
        }, {
            &heartBeatInterval, "output.heeartbeat.interval", 10 * 1000
        }, {
            &rpcMaxHARetry, "dfs.client.failover.max.attempts", 15, bind(CheckRangeGE<int32_t>, _1, _2, 0)
        }, {
            &maxFileDescriptorCacheSize, "dfs.client.read.shortcircuit.streams.cache.size", 256, bind(CheckRangeGE<int32_t>, _1, _2, 0)
        }, {
            &socketCacheExpiry, "dfs.client.socketcache.expiryMsec", 3000, bind(CheckRangeGE<int32_t>, _1, _2, 0)
        }, {
            &socketCacheCapacity, "dfs.client.socketcache.capacity", 16, bind(CheckRangeGE<int32_t>, _1, _2, 0)
        }, {
            &cryptoBufferSize, "hadoop.security.crypto.buffer.size", 8192
        }, {
            &httpRequestRetryTimes, "kms.send.request.retry.times", 0
        }
    };
    ConfigDefault<int64_t> i64Values [] = {
        {
            &defaultBlockSize, "dfs.default.blocksize", 64 * 1024 * 1024, bind(CheckMultipleOf<int64_t>, _1, _2, 512)
        },
        {
            &curlTimeout, "kms.send.request.timeout", 20L
        }
    };

    ConfigDefault<std::string> strValues [] = {
        {&defaultUri, "dfs.default.uri", "hdfs://localhost:8020" },
        {&rpcAuthMethod, "hadoop.security.authentication", "simple" },
        {&kerberosCachePath, "hadoop.security.kerberos.ticket.cache.path", "" },
        {&logSeverity, "dfs.client.log.severity", "INFO" },
        {&domainSocketPath, "dfs.domain.socket.path", ""},
        {&kmsUrl, "dfs.encryption.key.provider.uri", "" },
        {&kmsAuthMethod, "hadoop.kms.authentication.type", "simple" }
    };

    for (size_t i = 0; i < ARRAYSIZE(boolValues); ++i) {
        *boolValues[i].variable = conf.getBool(boolValues[i].key,
                                               boolValues[i].value);

        if (boolValues[i].check) {
            boolValues[i].check(boolValues[i].key, *boolValues[i].variable);
        }
    }

    for (size_t i = 0; i < ARRAYSIZE(i32Values); ++i) {
        *i32Values[i].variable = conf.getInt32(i32Values[i].key,
                                               i32Values[i].value);

        if (i32Values[i].check) {
            i32Values[i].check(i32Values[i].key, *i32Values[i].variable);
        }
    }

    for (size_t i = 0; i < ARRAYSIZE(i64Values); ++i) {
        *i64Values[i].variable = conf.getInt64(i64Values[i].key,
                                               i64Values[i].value);

        if (i64Values[i].check) {
            i64Values[i].check(i64Values[i].key, *i64Values[i].variable);
        }
    }

    for (size_t i = 0; i < ARRAYSIZE(strValues); ++i) {
        *strValues[i].variable = conf.getString(strValues[i].key,
                                                strValues[i].value);

        if (strValues[i].check) {
            strValues[i].check(strValues[i].key, *strValues[i].variable);
        }
    }
}

}
}
