/*
 *
 * Copyright 2019 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
#define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H

#include <grpc/support/port_platform.h>

#include <grpc/grpc.h>

#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/executor/mpmcqueue.h"

namespace grpc_core {

// A base abstract base class for threadpool.
// Threadpool is an executor that maintains a pool of threads sitting around
// and waiting for closures. A threadpool also maintains a queue of pending
// closures, when closures appearing in the queue, the threads in pool will
// pull them out and execute them.
class ThreadPoolInterface {
 public:
  // Waits for all pending closures to complete, then shuts down thread pool.
  virtual ~ThreadPoolInterface() {}

  // Schedules a given closure for execution later.
  // Depending on specific subclass implementation, this routine might cause
  // current thread to be blocked (in case of unable to schedule).
  // Closure should contain a function pointer and arguments it will take, more
  // details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h
  virtual void Add(grpc_experimental_completion_queue_functor* closure)
      GRPC_ABSTRACT;

  // Returns the current number of pending closures
  virtual int num_pending_closures() const GRPC_ABSTRACT;

  // Returns the capacity of pool (number of worker threads in pool)
  virtual int pool_capacity() const GRPC_ABSTRACT;

  // Thread option accessor
  virtual const Thread::Options& thread_options() const GRPC_ABSTRACT;

  // Returns the thread name for threads in this ThreadPool.
  virtual const char* thread_name() const GRPC_ABSTRACT;

  GRPC_ABSTRACT_BASE_CLASS
};

// Worker thread for threadpool. Executes closures in the queue, until getting a
// NULL closure.
class ThreadPoolWorker {
 public:
  ThreadPoolWorker(const char* thd_name, ThreadPoolInterface* pool,
                   MPMCQueueInterface* queue, Thread::Options& options,
                   int index)
      : queue_(queue), thd_name_(thd_name), index_(index) {
    thd_ = Thread(thd_name,
                  [](void* th) { static_cast<ThreadPoolWorker*>(th)->Run(); },
                  this, nullptr, options);
  }

  ~ThreadPoolWorker() {}

  void Start() { thd_.Start(); }
  void Join() { thd_.Join(); }

 private:
  // struct for tracking stats of thread
  struct Stats {
    gpr_timespec sleep_time;
    Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); }
  };

  void Run();  // Pulls closures from queue and executes them

  MPMCQueueInterface* queue_;  // Queue in thread pool to pull closures from
  Thread thd_;                 // Thread wrapped in
  Stats stats_;                // Stats to be collected in run time
  const char* thd_name_;       // Name of thread
  int index_;                  // Index in thread pool
};

// A fixed size thread pool implementation of abstract thread pool interface.
// In this implementation, the number of threads in pool is fixed, but the
// capacity of closure queue is unlimited.
class ThreadPool : public ThreadPoolInterface {
 public:
  // Creates a thread pool with size of "num_threads", with default thread name
  // "ThreadPoolWorker" and all thread options set to default. If the given size
  // is 0 or less, there will be 1 worker thread created inside pool.
  ThreadPool(int num_threads);

  // Same as ThreadPool(int num_threads) constructor, except
  // that it also sets "thd_name" as the name of all threads in the thread pool.
  ThreadPool(int num_threads, const char* thd_name);

  // Same as ThreadPool(const char *thd_name, int num_threads) constructor,
  // except that is also set thread_options for threads.
  // Notes for stack size:
  // If the stack size field of the passed in Thread::Options is set to default
  // value 0, default ThreadPool stack size will be used. The current default
  // stack size of this implementation is 1952K for mobile platform and 64K for
  // all others.
  ThreadPool(int num_threads, const char* thd_name,
             const Thread::Options& thread_options);

  // Waits for all pending closures to complete, then shuts down thread pool.
  ~ThreadPool() override;

  // Adds given closure into pending queue immediately. Since closure queue has
  // infinite length, this routine will not block.
  void Add(grpc_experimental_completion_queue_functor* closure) override;

  int num_pending_closures() const override;
  int pool_capacity() const override;
  const Thread::Options& thread_options() const override;
  const char* thread_name() const override;

 private:
  int num_threads_ = 0;
  const char* thd_name_ = nullptr;
  Thread::Options thread_options_;
  ThreadPoolWorker** threads_ = nullptr;  // Array of worker threads
  MPMCQueueInterface* queue_ = nullptr;   // Closure queue

  Atomic<bool> shut_down_{false};  // Destructor has been called if set to true

  void SharedThreadPoolConstructor();
  // For ThreadPool, default stack size for mobile platform is 1952K. for other
  // platforms is 64K.
  size_t DefaultStackSize();
  // Internal Use Only for debug checking.
  void AssertHasNotBeenShutDown();
};

}  // namespace grpc_core

#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */
