/*
 *
 * Copyright 2019 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <grpc/support/port_platform.h>

#include "src/core/lib/iomgr/executor/mpmcqueue.h"

namespace grpc_core {

DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool");

inline void* InfLenFIFOQueue::PopFront() {
  // Caller should already check queue is not empty and has already held the
  // mutex. This function will assume that there is at least one element in the
  // queue (i.e. queue_head_->content is valid).
  void* result = queue_head_->content;
  count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED);

  // Updates Stats when trace flag turned on.
  if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
    gpr_timespec wait_time =
        gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), queue_head_->insert_time);
    stats_.num_completed++;
    stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time);
    stats_.max_queue_time = gpr_time_max(
        gpr_convert_clock_type(stats_.max_queue_time, GPR_TIMESPAN), wait_time);

    if (count_.Load(MemoryOrder::RELAXED) == 0) {
      stats_.busy_queue_time =
          gpr_time_add(stats_.busy_queue_time,
                       gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), busy_time));
    }

    gpr_log(GPR_INFO,
            "[InfLenFIFOQueue PopFront] num_completed:        %" PRIu64
            " total_queue_time: %f max_queue_time:   %f busy_queue_time:   %f",
            stats_.num_completed,
            gpr_timespec_to_micros(stats_.total_queue_time),
            gpr_timespec_to_micros(stats_.max_queue_time),
            gpr_timespec_to_micros(stats_.busy_queue_time));
  }

  queue_head_ = queue_head_->next;
  // Signal waiting thread
  if (count_.Load(MemoryOrder::RELAXED) > 0) {
    TopWaiter()->cv.Signal();
  }

  return result;
}

InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) {
  num_nodes_ = num_nodes_ + num;
  Node* new_chunk = static_cast<Node*>(gpr_zalloc(sizeof(Node) * num));
  new_chunk[0].next = &new_chunk[1];
  new_chunk[num - 1].prev = &new_chunk[num - 2];
  for (int i = 1; i < num - 1; ++i) {
    new_chunk[i].prev = &new_chunk[i - 1];
    new_chunk[i].next = &new_chunk[i + 1];
  }
  return new_chunk;
}

InfLenFIFOQueue::InfLenFIFOQueue() {
  delete_list_size_ = kDeleteListInitSize;
  delete_list_ =
      static_cast<Node**>(gpr_zalloc(sizeof(Node*) * delete_list_size_));

  Node* new_chunk = AllocateNodes(kQueueInitNumNodes);
  delete_list_[delete_list_count_++] = new_chunk;
  queue_head_ = queue_tail_ = new_chunk;
  new_chunk[0].prev = &new_chunk[kQueueInitNumNodes - 1];
  new_chunk[kQueueInitNumNodes - 1].next = &new_chunk[0];

  waiters_.next = &waiters_;
  waiters_.prev = &waiters_;
}

InfLenFIFOQueue::~InfLenFIFOQueue() {
  GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0);
  for (size_t i = 0; i < delete_list_count_; ++i) {
    gpr_free(delete_list_[i]);
  }
  gpr_free(delete_list_);
}

void InfLenFIFOQueue::Put(void* elem) {
  MutexLock l(&mu_);

  int curr_count = count_.Load(MemoryOrder::RELAXED);

  if (queue_tail_ == queue_head_ && curr_count != 0) {
    // List is full. Expands list to double size by inserting new chunk of nodes
    Node* new_chunk = AllocateNodes(curr_count);
    delete_list_[delete_list_count_++] = new_chunk;
    // Expands delete list on full.
    if (delete_list_count_ == delete_list_size_) {
      delete_list_size_ = delete_list_size_ * 2;
      delete_list_ = static_cast<Node**>(
          gpr_realloc(delete_list_, sizeof(Node*) * delete_list_size_));
    }
    new_chunk[0].prev = queue_tail_->prev;
    new_chunk[curr_count - 1].next = queue_head_;
    queue_tail_->prev->next = new_chunk;
    queue_head_->prev = &new_chunk[curr_count - 1];
    queue_tail_ = new_chunk;
  }
  queue_tail_->content = static_cast<void*>(elem);

  // Updates Stats info
  if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
    stats_.num_started++;
    gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started:        %" PRIu64,
            stats_.num_started);
    auto current_time = gpr_now(GPR_CLOCK_MONOTONIC);
    if (curr_count == 0) {
      busy_time = current_time;
    }
    queue_tail_->insert_time = current_time;
  }

  count_.Store(curr_count + 1, MemoryOrder::RELAXED);
  queue_tail_ = queue_tail_->next;

  TopWaiter()->cv.Signal();
}

void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
  MutexLock l(&mu_);

  if (count_.Load(MemoryOrder::RELAXED) == 0) {
    gpr_timespec start_time;
    if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
        wait_time != nullptr) {
      start_time = gpr_now(GPR_CLOCK_MONOTONIC);
    }

    Waiter self;
    PushWaiter(&self);
    do {
      self.cv.Wait(&mu_);
    } while (count_.Load(MemoryOrder::RELAXED) == 0);
    RemoveWaiter(&self);
    if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
        wait_time != nullptr) {
      *wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
    }
  }
  GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0);
  return PopFront();
}

void InfLenFIFOQueue::PushWaiter(Waiter* waiter) {
  waiter->next = waiters_.next;
  waiter->prev = &waiters_;
  waiter->next->prev = waiter;
  waiter->prev->next = waiter;
}

void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) {
  GPR_DEBUG_ASSERT(waiter != &waiters_);
  waiter->next->prev = waiter->prev;
  waiter->prev->next = waiter->next;
}

InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { return waiters_.next; }

}  // namespace grpc_core
