/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// Copyright (c) 2011, François Saint-Jacques
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//     * Redistributions of source code must retain the above copyright
//       notice, this list of conditions and the following disclaimer.
//     * Redistributions in binary form must reproduce the above copyright
//       notice, this list of conditions and the following disclaimer in the
//       documentation and/or other materials provided with the distribution.
//     * Neither the name of the disruptor-- nor the
//       names of its contributors may be used to endorse or promote products
//       derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#ifndef DISRUPTOR_SEQUENCER_H_ // NOLINT
#define DISRUPTOR_SEQUENCER_H_ // NOLINT

#include <vector>

#include "batch_descriptor.h"
#include "claim_strategy.h"
#include "interface.h"
#include "sequence_barrier.h"
#include "wait_strategy.h"

namespace rocketmq {

// Coordinator for claiming sequences for access to a data structures while
// tracking dependent {@link Sequence}s
class Sequencer: public boost::noncopyable {
 public:
    // Construct a Sequencer with the selected strategies.
    //
    // @param buffer_size over which sequences are valid.
    // @param claim_strategy_option for those claiming sequences.
    // @param wait_strategy_option for those waiting on sequences.
    Sequencer(int buffer_size,
              ClaimStrategyOption claim_strategy_option,
              WaitStrategyOption wait_strategy_option) :
            buffer_size_(buffer_size),
            claim_strategy_(CreateClaimStrategy(claim_strategy_option,
                                                buffer_size_)),
            wait_strategy_(CreateWaitStrategy(wait_strategy_option)) { }

    ~Sequencer() {
        delete claim_strategy_;
        delete wait_strategy_;
    }

    // Set the sequences that will gate publishers to prevent the buffer
    // wrapping.
    //
    // @param sequences to be gated on.
    void set_gating_sequences(
            const std::vector<Sequence*>& sequences) {
        gating_sequences_ = sequences;
    }

    // Create a {@link SequenceBarrier} that gates on the cursor and a list of
    // {@link Sequence}s.
    //
    // @param sequences_to_track this barrier will track.
    // @return the barrier gated as required.
    ProcessingSequenceBarrier* NewBarrier(
            const std::vector<Sequence*>& sequences_to_track) {
        return new ProcessingSequenceBarrier(wait_strategy_, &cursor_,
                                             sequences_to_track);
    }

    // Create a new {@link BatchDescriptor} that is the minimum of the
    // requested size and the buffer_size.
    //
    // @param size for the new batch.
    // @return the new {@link BatchDescriptor}.
    BatchDescriptor* NewBatchDescriptor(const int& size) {
        return new BatchDescriptor(size<buffer_size_?size:buffer_size_);
    }

    // The capacity of the data structure to hold entries.
    //
    // @return capacity of the data structure.
    int buffer_size() { return buffer_size_; }


    // Get the value of the cursor indicating the published sequence.
    //
    // @return value of the cursor for events that have been published.
    int64_t GetCursor() { return cursor_.sequence(); }

    // Has the buffer capacity left to allocate another sequence. This is a
    // concurrent method so the response should only be taken as an indication
    // of available capacity.
    //
    // @return true if the buffer has the capacity to allocated another event.
    bool HasAvalaibleCapacity() {
        return claim_strategy_->HasAvalaibleCapacity(gating_sequences_);
    }

    // Claim the next event in sequence for publishing to the {@link RingBuffer}.
    //
    // @return the claimed sequence.
    int64_t Next() {
        return claim_strategy_->IncrementAndGet(gating_sequences_);
    }

    // Claim the next batch of sequence numbers for publishing.
    //
    // @param batch_descriptor to be updated for the batch range.
    // @return the updated batch_descriptor.
    BatchDescriptor* Next(BatchDescriptor* batch_descriptor) {
        int64_t sequence = claim_strategy_->IncrementAndGet(batch_descriptor->size(), gating_sequences_);
        batch_descriptor->set_end(sequence);
        return batch_descriptor;
    }

    // Claim a specific sequence when only one publisher is involved.
    //
    // @param sequence to be claimed.
    // @return sequence just claime.
    int64_t Claim(const int64_t& sequence) {
        claim_strategy_->SetSequence(sequence, gating_sequences_);
        return sequence;
    }

    // Publish an event and make it visible to {@link EventProcessor}s.
    //
    // @param sequence to be published.
    void Publish(const int64_t& sequence) {
        Publish(sequence, 1);
    }

    // Publish the batch of events in sequence.
    //
    // @param sequence to be published.
    void Publish(const BatchDescriptor& batch_descriptor) {
        Publish(batch_descriptor.end(), batch_descriptor.size());
    }

    // Force the publication of a cursor sequence.
    //
    // Only use this method when forcing a sequence and you are sure only one
    // publisher exists. This will cause the cursor to advance to this
    // sequence.
    //
    // @param sequence to which is to be forced for publication.
    void ForcePublish(const int64_t& sequence) {
        cursor_.set_sequence(sequence);
        wait_strategy_->SignalAllWhenBlocking();
    }

    // TODO(fsaintjacques): This was added to overcome
    // NoOpEventProcessor::GetSequence(), this is not a clean solution.
    Sequence* GetSequencePtr() {
        return &cursor_;
    }

 private:
    // Helpers
    void Publish(const int64_t& sequence, const int64_t& batch_size) {
        //LOG_DEBUG("publish sequence:%d", sequence);
        claim_strategy_->SerialisePublishing(sequence, cursor_, batch_size);
        cursor_.set_sequence(sequence);
        wait_strategy_->SignalAllWhenBlocking();
    }

    // Members
    const int buffer_size_;

    PaddedSequence cursor_;
    std::vector<Sequence*> gating_sequences_;

    ClaimStrategyInterface* claim_strategy_;
    WaitStrategyInterface* wait_strategy_;

};

};  // namespace rocketmq

#endif // DISRUPTOR_RING_BUFFER_H_ NOLINT
