from typing import Dict, List, Tuple
import boto3
from subprocess import run, PIPE
from dataclasses import dataclass
import json




_KAFKA_TOPIC_SCRIPT_PATH="kafka/bin"




@dataclass
class Config:
    kafkaBrokers: str
    awsRegion: str
    connectorPrefix: str
    dryRun: bool





def exec_command(command) -> Tuple[bool, str]:
    try:
        output = run(command, check=True, stdout=PIPE)

        return (True,output)
    except Exception as e:
        return (False,str(e))

def list_topics(bootstrap_servers) -> List[str]:
    command = [
            f'{_KAFKA_TOPIC_SCRIPT_PATH}/kafka-topics.sh',
            '--list',
            '--command-config', 'client.config',
            '--bootstrap-server', f'{bootstrap_servers}'
        ]
    
    status,output = exec_command(command) 
    if not status:
        print('List topics failed!')
        print(f'Reason: {output}')
        exit(1)
    topic_list = output.stdout.decode('ascii').split('\n')
    return topic_list

def delete_topics(topics_list, bootstrap_servers) -> None:
    for topic in topics_list:
        command = [
                f'{_KAFKA_TOPIC_SCRIPT_PATH}/kafka-topics.sh',
                '--command-config', 'client.config',
                '--bootstrap-server', f'{bootstrap_servers}',
                '--delete', '--topic', f'{topic}'
        ]
        print(f'Deleting topic {topic}')
        status,output = exec_command(command) 
        if not status:
            print('Delete topics failed!')
            print(f'Reason: {output}')
            exit(1)


if __name__ == "__main__":
    with open('config.json') as f:
        config: Config = Config(**json.load(f))
    print("Listing resources")
    topic_list = list_topics(config.kafkaBrokers)
    print("topic_list")
    ### Leave only internal topics in the list
    topic_deletion_list = [topic_name for topic_name in topic_list if config.connectorPrefix in topic_name]


    if config.dryRun:
        print('Topics to delete:')
        for topic in topic_deletion_list:
            print(topic)
    else:
        print("Deleting resources")
        delete_topics(topic_deletion_list,config.kafkaBrokers)
