import fnmatch
import io
import os
import pathlib
import queue
import subprocess
import sys
import threading
import time
import weakref
import random
from queue import Queue
from typing import Callable, Dict, List, Optional, Set

import pytest
from _pytest._code import Source

import deltachat

from . import Account, account_hookimpl, const, get_core_info
from .events import FFIEventLogger, FFIEventTracker


def pytest_addoption(parser):
    group = parser.getgroup("deltachat testplugin options")
    group.addoption(
        "--chatmail",
        action="store",
        default=None,
        help="chatmail server domain name",
    )
    group.addoption(
        "--ignored",
        action="store_true",
        help="Also run tests marked with the ignored marker",
    )
    group.addoption(
        "--strict-tls",
        action="store_true",
        help="Never accept invalid TLS certificates for test accounts",
    )
    group.addoption(
        "--extra-info",
        action="store_true",
        help="show more info on failures (imap server state, config)",
    )
    group.addoption(
        "--debug-setup",
        action="store_true",
        help="show events during configure and start io phases of online accounts",
    )


def pytest_configure(config):
    cfg = config.getoption("--chatmail")
    if not cfg:
        cfg = os.getenv("CHATMAIL_DOMAIN")
        if cfg:
            config.option.chatmail = cfg

    # Make sure we don't get garbled output because threads keep running
    # collect all ever created accounts in a weakref-set (so we don't
    # keep objects unnecessarily alive) and enable/disable logging
    # for each pytest test phase # (setup/call/teardown).
    # Additionally make the acfactory use a logging/no-logging default.

    class LoggingAspect:
        def __init__(self) -> None:
            self._accounts: weakref.WeakSet = weakref.WeakSet()

        @deltachat.global_hookimpl
        def dc_account_init(self, account):
            self._accounts.add(account)

        def disable_logging(self, item):
            for acc in self._accounts:
                acc.disable_logging()
            acfactory = item.funcargs.get("acfactory")
            if acfactory:
                acfactory.set_logging_default(False)

        def enable_logging(self, item):
            for acc in self._accounts:
                acc.enable_logging()
            acfactory = item.funcargs.get("acfactory")
            if acfactory:
                acfactory.set_logging_default(True)

        @pytest.hookimpl(hookwrapper=True)
        def pytest_runtest_setup(self, item):
            if item.get_closest_marker("ignored"):
                if not item.config.getvalue("ignored"):
                    pytest.skip("use --ignored to run this test")
            self.enable_logging(item)
            yield
            self.disable_logging(item)

        @pytest.hookimpl(hookwrapper=True)
        def pytest_pyfunc_call(self, pyfuncitem):
            self.enable_logging(pyfuncitem)
            yield
            self.disable_logging(pyfuncitem)

        @pytest.hookimpl(hookwrapper=True)
        def pytest_runtest_teardown(self, item):
            logging = item.config.getoption("--extra-info")
            if logging:
                self.enable_logging(item)
            yield
            if logging:
                self.disable_logging(item)

    la = LoggingAspect()
    config.pluginmanager.register(la)
    deltachat.register_global_plugin(la)


def pytest_report_header(config):
    info = get_core_info()
    summary = [
        "Deltachat core={} sqlite={} journal_mode={}".format(
            info["deltachat_core_version"],
            info["sqlite_version"],
            info["journal_mode"],
        ),
    ]

    chatmail_opt = config.getoption("--chatmail")
    if chatmail_opt:
        summary.append(f"Chatmail account provider: {chatmail_opt}")

    return summary


@pytest.fixture(scope="session")
def testprocess(request):
    """Return live account configuration manager.

    The returned object is a :class:`TestProcess` object."""
    return TestProcess(pytestconfig=request.config)


class TestProcess:
    """A pytest session-scoped instance to help with managing "live" account configurations."""

    _addr2files: Dict[str, Dict[pathlib.Path, bytes]]

    def __init__(self, pytestconfig) -> None:
        self.pytestconfig = pytestconfig
        self._addr2files = {}
        self._configlist: List[Dict[str, str]] = []

    def get_liveconfig_producer(self):
        """provide live account configs, cached on a per-test-process scope
        so that test functions can re-use already known live configs.
        """
        chatmail_opt = self.pytestconfig.getoption("--chatmail")
        if chatmail_opt:
            # Use a chatmail instance.
            domain = chatmail_opt
            MAX_LIVE_CREATED_ACCOUNTS = 10
            for index in range(MAX_LIVE_CREATED_ACCOUNTS):
                try:
                    yield self._configlist[index]
                except IndexError:
                    part = "".join(random.choices("2345789acdefghjkmnpqrstuvwxyz", k=6))
                    username = f"ci-{part}"
                    password = f"{username}${username}"
                    addr = f"{username}@{domain}"
                    config = {"addr": addr, "mail_pw": password}
                    print("newtmpuser {}: addr={}".format(index, config["addr"]))
                    self._configlist.append(config)
                    yield config
            pytest.fail(f"more than {MAX_LIVE_CREATED_ACCOUNTS} live accounts requested.")
        else:
            pytest.skip(
                "specify CHATMAIL_DOMAIN or --chatmail to provide live accounts",
            )

    def cache_maybe_retrieve_configured_db_files(self, cache_addr, db_target_path):
        db_target_path = pathlib.Path(db_target_path)
        assert not db_target_path.exists()

        try:
            filescache = self._addr2files[cache_addr]
        except KeyError:
            print("CACHE FAIL for", cache_addr)
            return False
        else:
            print("CACHE HIT for", cache_addr)
            targetdir = db_target_path.parent
            write_dict_to_dir(filescache, targetdir)
            return True

    def cache_maybe_store_configured_db_files(self, acc):
        addr = acc.get_config("addr")
        assert acc.is_configured()
        # don't overwrite existing entries
        if addr not in self._addr2files:
            print("storing cache for", addr)
            basedir = pathlib.Path(acc.get_blobdir()).parent
            self._addr2files[addr] = create_dict_from_files_in_path(basedir)
            return True


def create_dict_from_files_in_path(base):
    cachedict = {}
    for path in base.glob("**/*"):
        if path.is_file():
            cachedict[path.relative_to(base)] = path.read_bytes()
    return cachedict


def write_dict_to_dir(dic, target_dir):
    assert dic
    for relpath, content in dic.items():
        path = target_dir.joinpath(relpath)
        if not path.parent.exists():
            os.makedirs(path.parent)
        path.write_bytes(content)


@pytest.fixture()
def data(request):
    """Test data."""

    class Data:
        def __init__(self) -> None:
            # trying to find test data heuristically
            # because we are run from a dev-setup with pytest direct,
            # through tox, and then maybe also from deltachat-binding
            # users like "deltabot".
            self.paths = [
                os.path.normpath(x)
                for x in [
                    os.path.join(os.path.dirname(request.fspath.strpath), "data"),
                    os.path.join(os.path.dirname(request.fspath.strpath), "..", "..", "test-data"),
                    os.path.join(os.path.dirname(__file__), "..", "..", "..", "test-data"),
                ]
            ]

        def get_path(self, bn):
            """return path of file or None if it doesn't exist."""
            for path in self.paths:
                fn = os.path.join(path, *bn.split("/"))
                if os.path.exists(fn):
                    return fn
            print(f"WARNING: path does not exist: {fn!r}")
            return None

        def read_path(self, bn, mode="r"):
            fn = self.get_path(bn)
            if fn is not None:
                with open(fn, mode) as f:
                    return f.read()

    return Data()


class ACSetup:
    """
    Accounts setup helper to deal with multiple configure-process
    and io & imap initialization phases.

    From tests, use the higher level
    public ACFactory methods instead of its private helper class.
    """

    CONFIGURING = "CONFIGURING"
    CONFIGURED = "CONFIGURED"
    IDLEREADY = "IDLEREADY"

    _configured_events: Queue

    def __init__(self, testprocess, init_time) -> None:
        self._configured_events = Queue()
        self._account2state: Dict[Account, str] = {}
        self._imap_cleaned: Set[str] = set()
        self.testprocess = testprocess
        self.init_time = init_time

    def log(self, *args):
        print("[acsetup]", f"{time.time() - self.init_time:.3f}", *args)

    def add_configured(self, account):
        """add an already configured account."""
        assert account.is_configured()
        self._account2state[account] = self.CONFIGURED
        self.log("added already configured account", account, account.get_config("addr"))

    def start_configure(self, account):
        """add an account and start its configure process."""

        class PendingTracker:
            @account_hookimpl
            def ac_configure_completed(this, success: bool, comment: Optional[str]) -> None:
                self._configured_events.put((account, success, comment))

        account.add_account_plugin(PendingTracker(), name="pending_tracker")
        self._account2state[account] = self.CONFIGURING
        account.configure()
        self.log("started configure on", account)

    def wait_one_configured(self, account):
        """wait until this account has successfully configured."""
        if self._account2state[account] == self.CONFIGURING:
            while True:
                acc = self._pop_config_success()
                if acc == account:
                    break
            self.init_imap(acc)
            self.init_logging(acc)
            acc._evtracker.consume_events()

    def bring_online(self):
        """Wait for all accounts to become ready to receive messages.

        This will initialize logging, start IO and the direct_imap attribute
        for each account which either is CONFIGURED already or which is CONFIGURING
        and successfully completing the configuration process.
        """
        print("wait_all_configured finds accounts=", self._account2state)
        for acc, state in self._account2state.items():
            if state == self.CONFIGURED:
                self._onconfigure_start_io(acc)
                self._account2state[acc] = self.IDLEREADY

        while self.CONFIGURING in self._account2state.values():
            acc = self._pop_config_success()
            self._onconfigure_start_io(acc)
            self._account2state[acc] = self.IDLEREADY
        print("finished, account2state", self._account2state)

    def _pop_config_success(self):
        acc, success, comment = self._configured_events.get()
        if not success:
            pytest.fail(f"configuring online account {acc} failed: {comment}")
        self._account2state[acc] = self.CONFIGURED
        return acc

    def _onconfigure_start_io(self, acc):
        self.init_imap(acc)
        self.init_logging(acc)
        acc.start_io()
        print(acc._logid, "waiting for inbox IDLE to become ready")
        acc._evtracker.wait_idle_inbox_ready()
        acc._evtracker.consume_events()
        acc.log("inbox IDLE ready")

    def init_logging(self, acc):
        """idempotent function for initializing logging (will replace existing logger)."""
        logger = FFIEventLogger(acc, logid=acc._logid, init_time=self.init_time)
        acc.add_account_plugin(logger, name="logger-" + acc._logid)

    def init_imap(self, acc):
        """initialize direct_imap and cleanup server state."""
        from deltachat.direct_imap import DirectImap

        assert acc.is_configured()
        if not hasattr(acc, "direct_imap"):
            acc.direct_imap = DirectImap(acc)
        addr = acc.get_config("addr")
        if addr not in self._imap_cleaned:
            imap = acc.direct_imap
            for folder in imap.list_folders():
                if folder.lower() == "inbox" or folder.lower() == "deltachat":
                    assert imap.select_folder(folder)
                    imap.delete("1:*", expunge=True)
                else:
                    imap.conn.folder.delete(folder)
            acc.log(f"imap cleaned for addr {addr}")
            self._imap_cleaned.add(addr)


class ACFactory:
    """Account factory"""

    init_time: float
    _finalizers: List[Callable[[], None]]
    _accounts: List[Account]
    _acsetup: ACSetup
    _preconfigured_keys: List[str]

    def __init__(self, request, testprocess, tmpdir, data) -> None:
        self.init_time = time.time()
        self.tmpdir = tmpdir
        self.pytestconfig = request.config
        self.data = data
        self.testprocess = testprocess
        self._liveconfig_producer = testprocess.get_liveconfig_producer()

        self._finalizers = []
        self._accounts = []
        self._acsetup = ACSetup(testprocess, self.init_time)
        self._preconfigured_keys = ["alice", "bob", "charlie", "dom", "elena", "fiona"]
        self.set_logging_default(False)
        request.addfinalizer(self.finalize)

    def log(self, *args):
        print("[acfactory]", f"{time.time() - self.init_time:.3f}", *args)

    def finalize(self):
        while self._finalizers:
            fin = self._finalizers.pop()
            fin()

        while self._accounts:
            acc = self._accounts.pop()
            if acc is not None:
                imap = getattr(acc, "direct_imap", None)
                if imap is not None:
                    imap.shutdown()
                    del acc.direct_imap
                acc.shutdown()
                acc.disable_logging()

    def get_next_liveconfig(self):
        """
        Base function to get functional online configurations
        where we can make valid SMTP and IMAP connections with.
        """
        configdict = next(self._liveconfig_producer).copy()
        if "e2ee_enabled" not in configdict:
            configdict["e2ee_enabled"] = "1"

        if self.pytestconfig.getoption("--strict-tls"):
            # Enable strict certificate checks for online accounts
            configdict["imap_certificate_checks"] = str(const.DC_CERTCK_STRICT)
            configdict["smtp_certificate_checks"] = str(const.DC_CERTCK_STRICT)

        assert "addr" in configdict and "mail_pw" in configdict
        return configdict

    def _get_cached_account(self, addr) -> Optional[Account]:
        if addr in self.testprocess._addr2files:
            return self._getaccount(addr)
        return None

    def get_unconfigured_account(self, closed=False) -> Account:
        return self._getaccount(closed=closed)

    def _getaccount(self, try_cache_addr=None, closed=False) -> Account:
        logid = f"ac{len(self._accounts) + 1}"
        # we need to use fixed database basename for maybe_cache_* functions to work
        path = self.tmpdir.mkdir(logid).join("dc.db")
        if try_cache_addr:
            self.testprocess.cache_maybe_retrieve_configured_db_files(try_cache_addr, path)
        ac = Account(path.strpath, logging=self._logging, closed=closed)
        ac._logid = logid  # later instantiated FFIEventLogger needs this
        ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac))
        if self.pytestconfig.getoption("--debug-setup"):
            self._acsetup.init_logging(ac)
        self._accounts.append(ac)
        return ac

    def set_logging_default(self, logging) -> None:
        self._logging = bool(logging)

    def remove_preconfigured_keys(self) -> None:
        self._preconfigured_keys = []

    def _preconfigure_key(self, account, addr):
        # Only set a preconfigured key if we haven't used it yet for another account.
        try:
            keyname = self._preconfigured_keys.pop(0)
        except IndexError:
            pass
        else:
            fname_sec = self.data.read_path(f"key/{keyname}-secret.asc")
            if fname_sec:
                account._preconfigure_keypair(addr, fname_sec)
                return True
            print(f"WARN: could not use preconfigured keys for {addr!r}")

    def get_pseudo_configured_account(self, passphrase: Optional[str] = None) -> Account:
        # do a pseudo-configured account
        ac = self.get_unconfigured_account(closed=bool(passphrase))
        if passphrase:
            ac.open(passphrase)
        acname = ac._logid
        addr = f"{acname}@offline.org"
        ac.update_config(
            {
                "addr": addr,
                "displayname": acname,
                "mail_pw": "123",
                "configured_addr": addr,
                "configured_mail_pw": "123",
                "configured": "1",
            },
        )
        self._preconfigure_key(ac, addr)
        self._acsetup.init_logging(ac)
        return ac

    def new_online_configuring_account(self, cloned_from=None, cache=False, **kwargs) -> Account:
        if cloned_from is None:
            configdict = self.get_next_liveconfig()
        else:
            # XXX we might want to transfer the key to the new account
            configdict = {
                "addr": cloned_from.get_config("addr"),
                "mail_pw": cloned_from.get_config("mail_pw"),
                "imap_certificate_checks": cloned_from.get_config("imap_certificate_checks"),
                "smtp_certificate_checks": cloned_from.get_config("smtp_certificate_checks"),
            }
        configdict.update(kwargs)
        ac = self._get_cached_account(addr=configdict["addr"]) if cache else None
        if ac is not None:
            # make sure we consume a preconfig key, as if we had created a fresh account
            self._preconfigured_keys.pop(0)
            self._acsetup.add_configured(ac)
            return ac
        ac = self.prepare_account_from_liveconfig(configdict)
        self._acsetup.start_configure(ac)
        return ac

    def prepare_account_from_liveconfig(self, configdict) -> Account:
        ac = self.get_unconfigured_account()
        assert "addr" in configdict and "mail_pw" in configdict, configdict
        configdict.setdefault("bcc_self", False)
        configdict.setdefault("mvbox_move", False)
        configdict.setdefault("sentbox_watch", False)
        configdict.setdefault("sync_msgs", False)
        ac.update_config(configdict)
        self._preconfigure_key(ac, configdict["addr"])
        return ac

    def wait_configured(self, account) -> None:
        """Wait until the specified account has successfully completed configure."""
        self._acsetup.wait_one_configured(account)

    def bring_accounts_online(self) -> None:
        print("bringing accounts online")
        self._acsetup.bring_online()
        print("all accounts online")

    def get_online_accounts(self, num):
        accounts = [self.new_online_configuring_account(cache=True) for i in range(num)]
        self.bring_accounts_online()
        # we cache fully configured and started accounts
        for acc in accounts:
            self.testprocess.cache_maybe_store_configured_db_files(acc)
        return accounts

    def run_bot_process(self, module, ffi=True):
        fn = module.__file__

        bot_cfg = self.get_next_liveconfig()
        bot_ac = self.prepare_account_from_liveconfig(bot_cfg)

        # Forget ac as it will be opened by the bot subprocess
        # but keep something in the list to not confuse account generation
        self._accounts[self._accounts.index(bot_ac)] = None

        args = [
            sys.executable,
            "-u",
            fn,
            "--email",
            bot_cfg["addr"],
            "--password",
            bot_cfg["mail_pw"],
            bot_ac.db_path,
        ]
        if ffi:
            args.insert(-1, "--show-ffi")
        print("$", " ".join(args))
        popen = subprocess.Popen(
            args=args,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # combine stdout/stderr in one stream
            bufsize=0,  # line buffering
            close_fds=True,  # close all FDs other than 0/1/2
            universal_newlines=True,  # give back text
        )
        bot = BotProcess(popen, addr=bot_cfg["addr"])
        self._finalizers.append(bot.kill)
        return bot

    def dump_imap_summary(self, logfile):
        for ac in self._accounts:
            ac.dump_account_info(logfile=logfile)
            imap = getattr(ac, "direct_imap", None)
            if imap is not None:
                imap.dump_imap_structures(self.tmpdir, logfile=logfile)

    def get_accepted_chat(self, ac1: Account, ac2: Account):
        ac2.create_chat(ac1)
        return ac1.create_chat(ac2)

    def get_protected_chat(self, ac1: Account, ac2: Account):
        chat = ac1.create_group_chat("Protected Group", verified=True)
        qr = chat.get_join_qr()
        ac2.qr_join_chat(qr)
        ac2._evtracker.wait_securejoin_joiner_progress(1000)
        ev = ac2._evtracker.get_matching("DC_EVENT_MSGS_CHANGED")
        msg = ac2.get_message_by_id(ev.data2)
        assert msg is not None
        assert msg.text == "Messages are guaranteed to be end-to-end encrypted from now on."
        msg = ac2._evtracker.wait_next_incoming_message()
        assert msg is not None
        assert "Member Me " in msg.text and " added by " in msg.text
        return chat

    def introduce_each_other(self, accounts, sending=True):
        to_wait = []
        for i, acc in enumerate(accounts):
            for acc2 in accounts[i + 1 :]:
                chat = self.get_accepted_chat(acc, acc2)
                if sending:
                    chat.send_text("hi")
                    to_wait.append(acc2)
                    acc2.create_chat(acc).send_text("hi back")
                    to_wait.append(acc)
        for acc in to_wait:
            acc.log("waiting for incoming message")
            acc._evtracker.wait_next_incoming_message()


@pytest.fixture()
def acfactory(request, tmpdir, testprocess, data):
    """Account factory."""
    am = ACFactory(request=request, tmpdir=tmpdir, testprocess=testprocess, data=data)
    yield am
    if hasattr(request.node, "rep_call") and request.node.rep_call.failed:
        if testprocess.pytestconfig.getoption("--extra-info"):
            logfile = io.StringIO()
            am.dump_imap_summary(logfile=logfile)
            print(logfile.getvalue())
            # request.node.add_report_section("call", "imap-server-state", s)


class BotProcess:
    stdout_queue: queue.Queue

    def __init__(self, popen, addr) -> None:
        self.popen = popen
        self.addr = addr

        # we read stdout as quickly as we can in a thread and make
        # the (unicode) lines available for readers through a queue.
        self.stdout_queue = queue.Queue()
        self.stdout_thread = t = threading.Thread(target=self._run_stdout_thread, name="bot-stdout-thread")
        t.daemon = True
        t.start()

    def _run_stdout_thread(self) -> None:
        try:
            while True:
                line = self.popen.stdout.readline()
                if not line:
                    break
                line = line.strip()
                self.stdout_queue.put(line)
                print("bot-stdout: ", line)
        finally:
            self.stdout_queue.put(None)

    def kill(self) -> None:
        self.popen.kill()

    def wait(self, timeout=None) -> None:
        self.popen.wait(timeout=timeout)

    def fnmatch_lines(self, pattern_lines):
        patterns = [x.strip() for x in Source(pattern_lines.rstrip()).lines if x.strip()]
        for next_pattern in patterns:
            print("+++FNMATCH:", next_pattern)
            ignored = []
            while True:
                line = self.stdout_queue.get()
                if line is None:
                    if ignored:
                        print("BOT stdout terminated after these lines")
                        for line in ignored:
                            print(line)
                    raise IOError("BOT stdout-thread terminated")
                if fnmatch.fnmatch(line, next_pattern):
                    print("+++MATCHED:", line)
                    break
                else:
                    print("+++IGN:", line)
                    ignored.append(line)


@pytest.fixture()
def tmp_db_path(tmpdir):
    """Return a path inside the temporary directory where the database can be created."""
    return tmpdir.join("test.db").strpath


@pytest.fixture()
def lp():
    """Log printer fixture."""

    class Printer:
        def sec(self, msg: str) -> None:
            print()
            print("=" * 10, msg, "=" * 10)

        def step(self, msg: str) -> None:
            print("-" * 5, "step " + msg, "-" * 5)

        def indent(self, msg: str) -> None:
            print("  " + msg)

    return Printer()


@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
    # execute all other hooks to obtain the report object
    outcome = yield
    rep = outcome.get_result()

    # set a report attribute for each phase of a call, which can
    # be "setup", "call", "teardown"

    setattr(item, "rep_" + rep.when, rep)
