/********************************************************************
 * 2014 -
 * open source under Apache License Version 2.0
 ********************************************************************/
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
#define _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_

#include "Atomic.h"
#include "DateTime.h"
#include "ExceptionInternal.h"
#include "IpcConnectionContext.pb.h"
#include "Memory.h"
#include "network/BufferedSocketReader.h"
#include "network/TcpSocket.h"
#include "RpcCall.h"
#include "RpcChannelKey.h"
#include "RpcHeader.pb.h"
#include "RpcRemoteCall.h"
#include "SaslClient.h"
#include "Thread.h"
#include "Unordered.h"

#include <google/protobuf/message.h>

namespace Hdfs {
namespace Internal {

class RpcClient;

class RpcChannel {
public:
    /**
     * Destroy a channel
     */
    virtual ~RpcChannel() {
    }

    /**
     * The caller finished the rpc call,
     * this channel may be reused later if immediate is false.
     * @param immediate Do not reuse the channel any more if immediate is true.
     */
    virtual void close(bool immediate) = 0;

    /**
     * Invoke a rpc call.
     * @param call The call is to be invoked.
     * @return The remote call object.
     */
    virtual void invoke(const RpcCall & call) = 0;

    /**
     * Close the channel if it idle expired.
     * @return true if the channel idle expired.
     */
    virtual bool checkIdle() = 0;

    /**
     * Wait for all reference exiting.
     * The channel cannot be reused any more.
     * @pre RpcClient is not running.
     */
    virtual void waitForExit() = 0;

    /**
     * Add reference count to this channel.
     */
    virtual void addRef() = 0;
};

/**
 * RpcChannel represent a rpc connect to the server.
 */
class RpcChannelImpl: public RpcChannel {
public:
    /**
     * Construct a RpcChannelImpl instance.
     * @param k The key of this channel.
     */
    RpcChannelImpl(const RpcChannelKey & k, RpcClient & c);

    /**
     * Destroy a RpcChannelImpl instance.
     */
    ~RpcChannelImpl();

    /**
     * The caller finished the rpc call,
     * this channel may be reused later if immediate is false.
     * @param immediate Do not reuse the channel any more if immediate is true.
     */
    void close(bool immediate);

    /**
     * Invoke a rpc call.
     * @param call The call is to be invoked.
     * @return The remote call object.
     */
    void invoke(const RpcCall & call);

    /**
     * Close the channel if it idle expired.
     * @return true if the channel idle expired.
     */
    bool checkIdle();

    /**
     * Wait for all reference exiting.
     * The channel cannot be reused any more.
     * @pre RpcClient is not running.
     */
    void waitForExit();

    /**
     * Add reference count to this channel.
     */
    void addRef() {
        ++refs;
    }

private:
    /**
     * Setup the RPC connection.
     * @pre Already hold write lock.
     */
    void connect();

    /**
     * Cleanup all pending calls.
     * @param reason The reason to cancel the call.
     * @pre Already hold write lock.
     */
    void cleanupPendingCalls(exception_ptr reason);

    /**
     * Send rpc connect protocol header.
     * @throw HdfsNetworkException
     * @throw HdfsTimeout
     */
    void sendConnectionHeader(const RpcAuth& auth);

    /**
     * Send rpc connection protocol content.
     */
    void sendConnectionContent(const RpcAuth & auth);

    /**
     * Build rpc connect context.
     */
    void buildConnectionContext(IpcConnectionContextProto & connectionContext, const RpcAuth & auth);

    /**
     * Send ping packet to server.
     * @throw HdfsNetworkException
     * @throw HdfsTimeout
     * @pre Caller should hold the write lock.
     */
    void sendPing();

    /**
     * Send the call message to rpc server.
     * @param remote The remote call.
     * @pre Already hold write lock.
     */
    void sendRequest(RpcRemoteCallPtr remote);

    /**
     * Issue a rpc call and check response.
     * Catch all recoverable error in this function
     *
     * @param remote The remote call
     */
    exception_ptr invokeInternal(RpcRemoteCallPtr remote);

    /**
     * Check response, block until get one response.
     * @pre Channel already hold read lock.
     */
    void checkOneResponse();

    /**
     * read and handle one response.
     * @pre Channel already hold read lock.
     */
    void readOneResponse(bool writeLock);

    /**
     * Get the call object with given id, and then remove it from pending call list.
     * @param id The id of the call object to be returned.
     * @return The call object with given id.
     * @throw HdfsIOException
     * @pre Channel already locked.
     */
    RpcRemoteCallPtr getPendingCall(int32_t id);

    /**
     * Check if there is data available for reading on socket.
     * @return true if response is available.
     */
    bool getResponse();

    /**
     * wake up one caller to check response.
     * @param id The call id which current caller handled.
     */
    void wakeupOneCaller(int32_t id);

    /**
     * shutdown the RPC connection since error.
     * @param reason The reason to cancel the call
     * @pre Already hold write lock.
     */
    void shutdown(exception_ptr reason);

    const RpcSaslProto_SaslAuth * createSaslClient(
        const ::google::protobuf::RepeatedPtrField<RpcSaslProto_SaslAuth> * auths);

    void sendSaslMessage(RpcSaslProto * msg, ::google::protobuf::Message * resp);

    std::string saslEvaluateToken(RpcSaslProto & response, bool serverIsDone);

    RpcAuth setupSaslConnection();

private:
    /**
     * Construct a RpcChannelImpl instance for test.
     * @param key The key of this channel.
     * @param sock The socket instance.
     * @param in The BufferedSocketReader instance build on sock.
     * @param client The RpcClient instance.
     */
    RpcChannelImpl(const RpcChannelKey & key, Socket * sock,
                   BufferedSocketReader * in, RpcClient & client);

private:
    atomic<int> refs;
    bool available;
    mutex readMut;
    mutex writeMut;
    RpcChannelKey key;
    RpcClient & client;
    shared_ptr<BufferedSocketReader> in;
    shared_ptr<SaslClient> saslClient;
    shared_ptr<Socket> sock;
    steady_clock::time_point lastActivity; // ping is a kind of activity, lastActivity will be updated after ping
    steady_clock::time_point lastIdle; // ping cannot change idle state. If there is still pending calls, lastIdle is always "NOW".
    unordered_map<int32_t, RpcRemoteCallPtr> pendingCalls;
};

}
}

#endif /* _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_ */
