import time
from datetime import datetime
import boto3
from botocore.exceptions import ClientError
from typing import NamedTuple
import os
import json
import xml.etree.ElementTree as ET
from confluent_kafka.admin import AdminClient
from aws.constants import PROD_ACCOUNT_ENVS, PROD_ENVS
from actions_logging.app_logging import logger
from github.env import get_required_env_var, exit_on_error_and_write_summary, write_github_env
from aws.msk.msk_info import get_msk_environment_name
from aws.client import create_client_or_exit
from common.common import run_command
from aws.env_info import get_env_bucket

_AWS_REGION = get_required_env_var("AWS_MSK_REGION")
_CONNECTOR_NAME = get_required_env_var("CONNECTOR_NAME")
kafka_env_name = get_required_env_var("KAFKA_ENV")


def get_kafka_credentials(kafka_env_name, aws_region):
    """
    Retrieve Kafka credentials based on environment.
    Pulls credentials from AWS Secrets Manager for production environments.
    """
    secret_name = f"AmazonMSK_{kafka_env_name.replace('-', '_') if kafka_env_name == 'production-eu' else kafka_env_name}_cluster"

    if kafka_env_name in PROD_ACCOUNT_ENVS or kafka_env_name in PROD_ENVS:
        bootstrap_servers = get_required_env_var("KAFKA_BROKERS")

        # Retrieve secret from AWS Secrets Manager
        sasl_username, sasl_password = None, None
        secrets_client = create_client_or_exit("secretsmanager", aws_region)

        try:
            secret_dict = json.loads(secrets_client.get_secret_value(SecretId=secret_name)["SecretString"])
            sasl_username, sasl_password = secret_dict.get("username"), secret_dict.get("password")
        except secrets_client.exceptions.ResourceNotFoundException:
            logger.info(f"Secret {secret_name} not found in region {aws_region}.")
        except Exception as e:
            logger.info(f"An error occurred: {e}")

    else:
        bootstrap_servers = get_required_env_var("GLOBAL_CICD_KAFKA_BROKERS")
        sasl_username = get_required_env_var("GLOBAL_CICD_KAFKA_USERNAME_SM")
        sasl_password = get_required_env_var("GLOBAL_CICD_KAFKA_PASSWORD_SM")

    # Initialize Kafka AdminClient with the retrieved credentials
    return AdminClient({
        'bootstrap.servers': bootstrap_servers,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-512',
        'sasl.username': sasl_username,
        'sasl.password': sasl_password
    })


# Initialize Kafka AdminClient
admin_client = get_kafka_credentials(kafka_env_name, _AWS_REGION)


class PluginDetails(NamedTuple):
    name: str
    arn: str


def delete_connectors(conn_name) -> None:
    """
    Delete previous connectors
    :return: None
    """

    def _wait_until_connector_is_deleted(conn_arn: str, sleep_time: int = 10):
        go = True
        counter = 1
        while go:
            try:
                resp = kafka_connect_client.describe_connector(connectorArn=conn_arn)
                go = resp['connectorState'] == 'DELETING'
                logger.info(
                    f'{counter}) Current connector state: {resp["connectorState"]}. {counter * sleep_time} seconds passed')
                run_command(
                    f"echo Current connector state: {resp['connectorState']}. {counter * sleep_time} seconds passed")
                counter += 1
                time.sleep(sleep_time)
            except ClientError as ex:
                if ex.response['Error']['Code'] == 'NotFoundException':
                    logger.info(f'Connector was deleted.')
                    run_command(f'echo Connector was deleted.')
                    go = False
                else:
                    logger.error(f'Failed to delete connector. Reason: {str(ex)}')
                    run_command(f'echo Failed to delete connector. Reason: {str(ex)}')
                    exit_on_error_and_write_summary(f'Failed to delete connector. Reason: {str(ex)}')
                    raise ex

    response = list_connectors(conn_name)
    connectors_list = response['connectors']
    if not connectors_list:
        logger.info(f'No connectors matching prefix ({conn_name}) found.')
        return

    logger.info(
        f'{len(connectors_list)} connectors matching prefix ({conn_name}) found. Waiting for the connectors to be deleted..')
    for conn in connectors_list:
        try:
            response = kafka_connect_client.delete_connector(connectorArn=conn['connectorArn'])
            _wait_until_connector_is_deleted(response['connectorArn'])
            logger.info(f"connector {conn_name} has been deleted")
            print(f"connector {conn_name} has been deleted")
        except Exception as e:
            logger.error(f'Failed to delete connector. Reason: {e}')
            exit_on_error_and_write_summary(f'Failed to delete connector. Reason: {e}')
    logger.info('Finished deleting connectors.')


def delete_connector_default_topics(connector):
    topics = admin_client.list_topics().topics
    topics_to_delete = [topic for topic in topics if topic.startswith('__amazon') and connector in topic]
    if topics_to_delete:
        fs = admin_client.delete_topics(topics_to_delete, operation_timeout=30)
        for topic, f in fs.items():
            try:
                f.result()
                logger.info("Topic {} deleted".format(topic))
            except Exception as e:
                logger.error("Failed to delete topic {}: {}".format(topic, e))
    else:
        logger.info("No topics found")


def list_connectors(prefix) -> dict:
    kafka_connect_client = create_client_or_exit('kafkaconnect', _AWS_REGION)
    response = kafka_connect_client.list_connectors(
        connectorNamePrefix=prefix,
        maxResults=100
    )
    connectors_list = response['connectors']
    logger.info('--  connectors list --')
    for i, conn in enumerate(connectors_list, 1):
        logger.info(f'{i}) {conn["connectorArn"]}')
    logger.info('--  connectors list end --')
    return response


def list_plugins():
    plugins = kafka_connect_client.list_custom_plugins()
    plugins_arn = []
    logger.info('--  plugins list --')
    for i, plugin in enumerate(plugins['customPlugins'], 1):
        logger.info(f'{i}) {plugin["name"]} ({plugin["creationTime"]} - {plugin["customPluginState"]})')
        plugins_arn.append(plugin['customPluginArn'])
    logger.info('--  plugins list end --')
    return plugins_arn


def create_plugin(plugin_name, bucket_name, zip_file_path) -> PluginDetails:
    try:
        response = kafka_connect_client.create_custom_plugin(
            contentType='ZIP',
            description='A custom Kafka connect FWL S3 connector',
            location={
                's3Location': {
                    'bucketArn': f"{bucket_name}",
                    'fileKey': f"{zip_file_path}"
                }
            },
            name=f"{plugin_name}"
        )
        logger.info(f"plugin - {plugin_name} created")
        return PluginDetails(response['name'], response['customPluginArn'])
    except ClientError as e:
        logger.error(f'ERROR: Failed to create plugin: {str(e)}')
        return None


def nowStr() -> str:
    return datetime.now().strftime("%Y/%m/%d %H:%M:%S")


def wait_until_all_plugins_deleted(relevant_plugins):
   # if relevant_plugins is empty, return True
    if not relevant_plugins:
        logger.info("no relevant plugins found")
        return True
    deleted = 0
    for plugin in relevant_plugins:
        try:
            response = kafka_connect_client.describe_custom_plugin(customPluginArn=plugin)
            if response['customPluginState'] != 'DELETING':
                logger.error(f"{plugin} should be in deleting status. not in {response['customPluginState']}, exiting")
                exit(1)
            else:
                logger.info(f"plugin {plugin} still deleting")
                return False
        except kafka_connect_client.exceptions.NotFoundException:
            logger.info(f"plugin {plugin} not found")
            deleted += 1
            if deleted == len(relevant_plugins):
                logger.info(f"deleted plugins num: {deleted}, total: {len(relevant_plugins)}")
                return True
        except Exception as e:
            logger.error(f"error: {e}")



def wait_until_plugin_created(plugin_arn):
    response = kafka_connect_client.describe_custom_plugin(customPluginArn=plugin_arn)
    return response['customPluginState'] == 'ACTIVE'


if __name__ == "__main__":
    logger.info("sts caller identity:")
    sts = boto3.client("sts")
    response = sts.get_caller_identity()
    logger.info(response)
    run_command('aws sts get-caller-identity --output text')
    env = get_required_env_var('ENV_NAME')
    msk_env = get_msk_environment_name(env)
    VERSION = os.getenv("PLUGIN_VERSION")
    zip_file_path = os.getenv("PLUGIN_ZIP_FILE_PATH")
    env_prefix = os.getenv("env_prefix", "")
    env_prefix = "-qa" if env_prefix == "qa" else ""

    bucket_name = f"arn:aws:s3:::{get_env_bucket(env)}"
    if _CONNECTOR_NAME == 'placeholder':
        mytree = ET.parse('../guest-code/pom.xml')
        myroot = mytree.getroot()
        ns = myroot.tag
        version_ns = ns.replace('}project', '}artifactId')
        _CONNECTOR_NAME = myroot.find(version_ns).text
        logger.info(f"connector name from pom: {_CONNECTOR_NAME}")

    DELETE_ONLY = os.getenv('A_DELETE_ONLY', 'false')
    kafka_connect_client = boto3.client('kafkaconnect', region_name=_AWS_REGION)

    logger.info(f'{nowStr()} - Started')
    logger.info(f'ENV: {env}')
    conn_plugin_name = f'{msk_env}-{_CONNECTOR_NAME}'

    logger.debug(f"conn_plugin_name: {conn_plugin_name}")
    delete_connectors(conn_plugin_name)
    run_command(f"echo connector was deleted")

    all_plugins_list = list_plugins()
    logger.info("start to delete all custom plugins")
    relevant_plugins = [plugin_arn for plugin_arn in all_plugins_list if conn_plugin_name in plugin_arn]
    for plugin_arn in relevant_plugins:
        try:
            logger.info(
                f"aws kafkaconnect delete-custom-plugin --custom-plugin-arn {plugin_arn} --region {_AWS_REGION}")
            run_command(f"aws kafkaconnect delete-custom-plugin --custom-plugin-arn {plugin_arn} --region {_AWS_REGION}")
        except Exception as e:
            logger.error(f"delete plugin {plugin_arn} error: {e}")

    for c in range(12):
        time.sleep(5)
        if wait_until_all_plugins_deleted(relevant_plugins):
            logger.info("all plugins deleted successfully")
            break
        if c == 11:
            logger.info("the plugins did not delete successfully, exit")
            exit(1)

    logger.info("done with deletion of plugins")
    if DELETE_ONLY == 'true':
        logger.info("A_DELETE_ONLY set to true. exit now")
        exit(0)

    delete_connector_default_topics(conn_plugin_name)
    logger.info(f"start to create plugin: {conn_plugin_name}")

    plugin_arn = create_plugin(f"{conn_plugin_name}-{VERSION.replace('.', '-')}", bucket_name, zip_file_path).arn
    for c in range(12):
        time.sleep(5)
        if wait_until_plugin_created(plugin_arn):
            logger.info("plugins created successfully")
            break
        if c == 11:
            logger.error("the plugins did not create successfully, exit")
            exit(1)

    plugin_name = f"{conn_plugin_name}-{VERSION.replace('.', '-')}"
    write_github_env(plugin_arn, "PLUGIN_ARN")
    write_github_env(plugin_name, "PLUGIN_NAME")
    logger.info(f'{nowStr()} - Finished')
