/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#if !defined(WIN32) && !defined(__APPLE__)
#include <sys/prctl.h>
#endif
#include "ConsumeMsgService.h"
#include "DefaultMQPushConsumer.h"
#include "Logging.h"
#include "UtilAll.h"
namespace rocketmq {

//<!************************************************************************
ConsumeMessageConcurrentlyService::ConsumeMessageConcurrentlyService(
    MQConsumer* consumer, int threadCount, MQMessageListener* msgListener)
    : m_pConsumer(consumer),
      m_pMessageListener(msgListener),
      m_ioServiceWork(m_ioService) {
#if !defined(WIN32) && !defined(__APPLE__)
  string taskName = UtilAll::getProcessName();
  prctl(PR_SET_NAME, "ConsumeTP", 0, 0, 0);
#endif
  for (int i = 0; i != threadCount; ++i) {
    m_threadpool.create_thread(
        boost::bind(&boost::asio::io_service::run, &m_ioService));
  }
#if !defined(WIN32) && !defined(__APPLE__)
  prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
#endif
}

ConsumeMessageConcurrentlyService::~ConsumeMessageConcurrentlyService(void) {
  m_pConsumer = NULL;
  m_pMessageListener = NULL;
}

void ConsumeMessageConcurrentlyService::start() {}

void ConsumeMessageConcurrentlyService::shutdown() { stopThreadPool(); }

void ConsumeMessageConcurrentlyService::stopThreadPool() {
  m_ioService.stop();
  m_threadpool.join_all();
}

MessageListenerType
ConsumeMessageConcurrentlyService::getConsumeMsgSerivceListenerType() {
  return m_pMessageListener->getMessageListenerType();
}

void ConsumeMessageConcurrentlyService::submitConsumeRequest(
    PullRequest* request, vector<MQMessageExt>& msgs) {
  m_ioService.post(boost::bind(
      &ConsumeMessageConcurrentlyService::ConsumeRequest, this, request, msgs));
}

void ConsumeMessageConcurrentlyService::ConsumeRequest(
    PullRequest* request, vector<MQMessageExt>& msgs) {
  if (!request || request->isDroped()) {
    LOG_WARN("the pull result is NULL or Had been dropped");
    request->clearAllMsgs();  // add clear operation to avoid bad state when
                              // dropped pullRequest returns normal
    return;
  }

  //<!¶ÁÈ¡Êý¾Ý;
  if (msgs.empty()) {
    LOG_WARN("the msg of pull result is NULL,its mq:%s",
             (request->m_messageQueue).toString().c_str());
    return;
  }

  ConsumeStatus status = CONSUME_SUCCESS;
  if (m_pMessageListener != NULL) {
    resetRetryTopic(msgs);
    request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
    status = m_pMessageListener->consumeMessage(msgs);
  }

  /*LOG_DEBUG("Consumed MSG size:%d of mq:%s",
      msgs.size(), (request->m_messageQueue).toString().c_str());*/
  int ackIndex = -1;
  switch (status) {
    case CONSUME_SUCCESS:
      ackIndex = msgs.size();
      break;
    case RECONSUME_LATER:
      ackIndex = -1;
      break;
    default:
      break;
  }

  switch (m_pConsumer->getMessageModel()) {
    case BROADCASTING:
      // Note: broadcasting reconsume should do by application, as it has big
      // affect to broker cluster
      if (ackIndex != (int)msgs.size())
        LOG_WARN("BROADCASTING, the message consume failed, drop it:%s",
                 (request->m_messageQueue).toString().c_str());
      break;
    case CLUSTERING:
      // send back msg to broker;
      for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
        LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT
                 ", reconsume "
                 "times is:%d",
                 (request->m_messageQueue).toString().c_str(),
                 msgs[i].getMsgId().c_str(), i, msgs[i].getReconsumeTimes());
        m_pConsumer->sendMessageBack(msgs[i], 0);
      }
      break;
    default:
      break;
  }

  // update offset
  int64 offset = request->removeMessage(msgs);
  // LOG_DEBUG("update offset:%lld of mq: %s",
  // offset,(request->m_messageQueue).toString().c_str());
  if (offset >= 0) {
    m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset);
  } else {
    LOG_WARN("Note: accumulation consume occurs on mq:%s",
             (request->m_messageQueue).toString().c_str());
  }
}

void ConsumeMessageConcurrentlyService::resetRetryTopic(
    vector<MQMessageExt>& msgs) {
  string groupTopic = UtilAll::getRetryTopic(m_pConsumer->getGroupName());
  for (size_t i = 0; i < msgs.size(); i++) {
    MQMessageExt& msg = msgs[i];
    string retryTopic = msg.getProperty(MQMessage::PROPERTY_RETRY_TOPIC);
    if (!retryTopic.empty() && groupTopic.compare(msg.getTopic()) == 0) {
      msg.setTopic(retryTopic);
    }
  }
}

//<!***************************************************************************
}  //<!end namespace;
