#!/usr/bin/env escript
%% -*- erlang -*-
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

main(_) ->
    test_util:init_code_path(),

    etap:plan(155),
    case (catch test()) of
        ok ->
            etap:end_tests();
        Other ->
            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
            etap:bail(Other)
    end,
    ok.


test() ->
    ok = crypto:start(),
    test_single_consumer_max_item_count(),
    test_single_consumer_max_size(),
    test_single_consumer_max_item_count_and_size(),
    test_multiple_consumers(),
    ok.


test_single_consumer_max_item_count() ->
    etap:diag("Spawning a queue with 3 max items, 1 producer and 1 consumer"),

    {ok, Q} = couch_work_queue:new([{max_items, 3}]),
    Producer = spawn_producer(Q),
    Consumer = spawn_consumer(Q),

    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),

    consume(Consumer, 1),
    etap:is(ping(Consumer), timeout,
        "Consumer blocked when attempting to dequeue 1 item from empty queue"),

    Item1 = produce(Producer, 10),
    etap:is(ping(Producer), ok, "Producer not blocked"),

    etap:is(ping(Consumer), ok, "Consumer unblocked"),
    etap:is(last_consumer_items(Consumer), {ok, [Item1]},
        "Consumer received the right item"),

    Item2 = produce(Producer, 20),
    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),

    Item3 = produce(Producer, 15),
    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),

    Item4 = produce(Producer, 3),
    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
    etap:is(ping(Producer), timeout, "Producer blocked with full queue"),

    consume(Consumer, 2),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue 2 items from queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item2, Item3]},
        "Consumer received the right items"),
    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),

    consume(Consumer, 2),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue 2 items from queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item4]},
        "Consumer received the right item"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),

    consume(Consumer, 100),
    etap:is(ping(Consumer), timeout,
        "Consumer blocked when attempting to dequeue 100 items from empty queue"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),

    Item5 = produce(Producer, 11),
    etap:is(ping(Producer), ok, "Producer not blocked with empty queue"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),

    Item6 = produce(Producer, 19),
    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),

    Item7 = produce(Producer, 2),
    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),

    Item8 = produce(Producer, 33),
    etap:is(ping(Producer), timeout, "Producer blocked with full queue"),
    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),

    etap:is(ping(Consumer), ok, "Consumer unblocked"),
    etap:is(last_consumer_items(Consumer), {ok, [Item5]},
        "Consumer received the first queued item"),
    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),

    consume(Consumer, all),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue all items from queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item6, Item7, Item8]},
        "Consumer received all queued items"),

    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),

    etap:is(close_queue(Q), ok, "Closed queue"),
    consume(Consumer, 1),
    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),

    stop(Producer, "producer"),
    stop(Consumer, "consumer").



test_single_consumer_max_size() ->
    etap:diag("Spawning a queue with max size of 160 bytes, "
        "1 producer and 1 consumer"),

    {ok, Q} = couch_work_queue:new([{max_size, 160}]),
    Producer = spawn_producer(Q),
    Consumer = spawn_consumer(Q),

    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    consume(Consumer, 1),
    etap:is(ping(Consumer), timeout,
        "Consumer blocked when attempting to dequeue 1 item from empty queue"),

    Item1 = produce(Producer, 50),
    etap:is(ping(Producer), ok, "Producer not blocked"),

    etap:is(ping(Consumer), ok, "Consumer unblocked"),
    etap:is(last_consumer_items(Consumer), {ok, [Item1]},
        "Consumer received the right item"),

    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    Item2 = produce(Producer, 50),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
    etap:is(couch_work_queue:size(Q), 50, "Queue size is 50 bytes"),

    Item3 = produce(Producer, 50),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
    etap:is(couch_work_queue:size(Q), 100, "Queue size is 100 bytes"),

    Item4 = produce(Producer, 61),
    etap:is(ping(Producer), timeout, "Producer blocked"),
    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
    etap:is(couch_work_queue:size(Q), 161, "Queue size is 161 bytes"),

    consume(Consumer, 1),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue 1 item from full queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item2]},
        "Consumer received the right item"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
    etap:is(couch_work_queue:size(Q), 111, "Queue size is 111 bytes"),

    Item5 = produce(Producer, 20),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
    etap:is(couch_work_queue:size(Q), 131, "Queue size is 131 bytes"),

    Item6 = produce(Producer, 40),
    etap:is(ping(Producer), timeout, "Producer blocked"),
    etap:is(couch_work_queue:item_count(Q), 4, "Queue item count is 4"),
    etap:is(couch_work_queue:size(Q), 171, "Queue size is 171 bytes"),

    etap:is(close_queue(Q), timeout,
        "Timeout when trying to close non-empty queue"),

    consume(Consumer, 2),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue 2 items from full queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item3, Item4]},
        "Consumer received the right items"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
    etap:is(couch_work_queue:size(Q), 60, "Queue size is 60 bytes"),

    etap:is(close_queue(Q), timeout,
        "Timeout when trying to close non-empty queue"),

    consume(Consumer, all),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue all items from queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
        "Consumer received the right items"),

    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),

    consume(Consumer, all),
    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),

    stop(Producer, "producer"),
    stop(Consumer, "consumer").


test_single_consumer_max_item_count_and_size() ->
    etap:diag("Spawning a queue with 3 max items, max size of 200 bytes, "
        "1 producer and 1 consumer"),

    {ok, Q} = couch_work_queue:new([{max_items, 3}, {max_size, 200}]),
    Producer = spawn_producer(Q),
    Consumer = spawn_consumer(Q),

    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    Item1 = produce(Producer, 100),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
    etap:is(couch_work_queue:size(Q), 100, "Queue size is 100 bytes"),

    Item2 = produce(Producer, 110),
    etap:is(ping(Producer), timeout,
        "Producer blocked when queue size >= max_size"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
    etap:is(couch_work_queue:size(Q), 210, "Queue size is 210 bytes"),

    consume(Consumer, all),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue all items from queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item1, Item2]},
        "Consumer received the right items"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    etap:is(ping(Producer), ok, "Producer not blocked anymore"),

    Item3 = produce(Producer, 10),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
    etap:is(couch_work_queue:size(Q), 10, "Queue size is 10 bytes"),

    Item4 = produce(Producer, 4),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
    etap:is(couch_work_queue:size(Q), 14, "Queue size is 14 bytes"),

    Item5 = produce(Producer, 2),
    etap:is(ping(Producer), timeout,
        "Producer blocked when queue item count = max_items"),
    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
    etap:is(couch_work_queue:size(Q), 16, "Queue size is 16 bytes"),

    consume(Consumer, 1),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue 1 item from queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item3]},
       "Consumer received 1 item"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
    etap:is(couch_work_queue:size(Q), 6, "Queue size is 6 bytes"),

    etap:is(close_queue(Q), timeout,
        "Timeout when trying to close non-empty queue"),

    consume(Consumer, 1),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue 1 item from queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item4]},
       "Consumer received 1 item"),
    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
    etap:is(couch_work_queue:size(Q), 2, "Queue size is 2 bytes"),

    Item6 = produce(Producer, 50),
    etap:is(ping(Producer), ok,
        "Producer not blocked when queue is not full and already received"
        " a close request"),
    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
    etap:is(couch_work_queue:size(Q), 52, "Queue size is 52 bytes"),

    consume(Consumer, all),
    etap:is(ping(Consumer), ok,
        "Consumer not blocked when attempting to dequeue all items from queue"),
    etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
       "Consumer received all queued items"),

    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),

    consume(Consumer, 1),
    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),

    stop(Producer, "producer"),
    stop(Consumer, "consumer").


test_multiple_consumers() ->
    etap:diag("Spawning a queue with 3 max items, max size of 200 bytes, "
        "1 producer and 3 consumers"),

    {ok, Q} = couch_work_queue:new(
        [{max_items, 3}, {max_size, 200}, {multi_workers, true}]),
    Producer = spawn_producer(Q),
    Consumer1 = spawn_consumer(Q),
    Consumer2 = spawn_consumer(Q),
    Consumer3 = spawn_consumer(Q),

    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    consume(Consumer1, 1),
    etap:is(ping(Consumer1), timeout,
        "Consumer 1 blocked when attempting to dequeue 1 item from empty queue"),
    consume(Consumer2, 2),
    etap:is(ping(Consumer2), timeout,
        "Consumer 2 blocked when attempting to dequeue 2 items from empty queue"),
    consume(Consumer3, 1),
    etap:is(ping(Consumer3), timeout,
        "Consumer 3 blocked when attempting to dequeue 1 item from empty queue"),

    Item1 = produce(Producer, 50),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    Item2 = produce(Producer, 50),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    Item3 = produce(Producer, 50),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
    etap:is(last_consumer_items(Consumer1), {ok, [Item1]},
       "Consumer 1 received 1 item"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
    etap:is(last_consumer_items(Consumer2), {ok, [Item2]},
       "Consumer 2 received 1 item"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
    etap:is(last_consumer_items(Consumer3), {ok, [Item3]},
       "Consumer 3 received 1 item"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    consume(Consumer1, 1),
    etap:is(ping(Consumer1), timeout,
        "Consumer 1 blocked when attempting to dequeue 1 item from empty queue"),
    consume(Consumer2, 2),
    etap:is(ping(Consumer2), timeout,
        "Consumer 2 blocked when attempting to dequeue 1 item from empty queue"),
    consume(Consumer3, 1),
    etap:is(ping(Consumer3), timeout,
        "Consumer 3 blocked when attempting to dequeue 1 item from empty queue"),

    Item4 = produce(Producer, 50),
    etap:is(ping(Producer), ok, "Producer not blocked"),
    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),

    etap:is(close_queue(Q), ok, "Closed queue"),

    etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
    etap:is(last_consumer_items(Consumer1), {ok, [Item4]},
       "Consumer 1 received 1 item"),

    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),

    etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
    etap:is(last_consumer_items(Consumer2), closed,
        "Consumer 2 received 'closed' atom"),

    etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
    etap:is(last_consumer_items(Consumer3), closed,
        "Consumer 3 received 'closed' atom"),

    stop(Producer, "producer"),
    stop(Consumer1, "consumer 1"),
    stop(Consumer2, "consumer 2"),
    stop(Consumer3, "consumer 3").


close_queue(Q) ->
    ok = couch_work_queue:close(Q),
    MonRef = erlang:monitor(process, Q),
    receive
    {'DOWN', MonRef, process, Q, _Reason} ->
         etap:diag("Queue closed")
    after 3000 ->
         erlang:demonitor(MonRef),
         timeout
    end.


spawn_consumer(Q) ->
    Parent = self(),
    spawn(fun() -> consumer_loop(Parent, Q, nil) end).


consumer_loop(Parent, Q, PrevItem) ->
    receive
    {stop, Ref} ->
        Parent ! {ok, Ref};
    {ping, Ref} ->
        Parent ! {pong, Ref},
        consumer_loop(Parent, Q, PrevItem);
    {last_item, Ref} ->
        Parent ! {item, Ref, PrevItem},
        consumer_loop(Parent, Q, PrevItem);
    {consume, N} ->
        Result = couch_work_queue:dequeue(Q, N),
        consumer_loop(Parent, Q, Result)
    end.


spawn_producer(Q) ->
    Parent = self(),
    spawn(fun() -> producer_loop(Parent, Q) end).


producer_loop(Parent, Q) ->
    receive
    {stop, Ref} ->
        Parent ! {ok, Ref};
    {ping, Ref} ->
        Parent ! {pong, Ref},
        producer_loop(Parent, Q);
    {produce, Ref, Size} ->
        Item = crypto:rand_bytes(Size),
        Parent ! {item, Ref, Item},
        ok = couch_work_queue:queue(Q, Item),
        producer_loop(Parent, Q)
    end.


consume(Consumer, N) ->
    Consumer ! {consume, N}.


last_consumer_items(Consumer) ->
    Ref = make_ref(),
    Consumer ! {last_item, Ref},
    receive
    {item, Ref, Items} ->
        Items
    after 3000 ->
        timeout
    end.


produce(Producer, Size) ->
    Ref = make_ref(),
    Producer ! {produce, Ref, Size},
    receive
    {item, Ref, Item} ->
        Item
    after 3000 ->
        etap:bail("Timeout asking producer to produce an item")
    end.


ping(Pid) ->
    Ref = make_ref(),
    Pid ! {ping, Ref},
    receive
    {pong, Ref} ->
        ok
    after 3000 ->
        timeout
    end.


stop(Pid, Name) ->
    Ref = make_ref(),
    Pid ! {stop, Ref},
    receive
    {ok, Ref} ->
        etap:diag("Stopped " ++ Name)
    after 4000 ->
        etap:bail("Timeout stopping " ++ Name)
    end.
