# Admin

The `KafkaAdminHelper` wraps `@platformatic/kafka`'s `Admin` with health tracking, graceful shutdown, and broker event callbacks. Use `getAdmin()` to access the full Admin API directly.

```typescript
class KafkaAdminHelper extends BaseKafkaHelper<Admin>
```

> [!NOTE]
> `KafkaAdminHelper` has **no generic type parameters** -- the Admin client does not deal with serialized messages.

## Helper API

| Method | Signature | Description |
|--------|-----------|-------------|
| `newInstance(opts)` | `static newInstance(opts): KafkaAdminHelper` | Factory method |
| `getAdmin()` | `(): Admin` | Access the underlying `Admin` |
| `isHealthy()` | `(): boolean` | `true` when broker connected |
| `isReady()` | `(): boolean` | Same as `isHealthy()` |
| `getHealthStatus()` | `(): TKafkaHealthStatus` | `'connected'` \| `'disconnected'` \| `'unknown'` |
| `close(opts?)` | `(opts?: { isForce?: boolean }): Promise<void>` | Close the admin connection (default: graceful) |

## IKafkaAdminOptions

```typescript
interface IKafkaAdminOptions extends IKafkaConnectionOptions {
  identifier?: string;       // Default: 'kafka-admin'
  shutdownTimeout?: number;  // Default: 30000ms
  onBrokerConnect?: TKafkaBrokerEventCallback;
  onBrokerDisconnect?: TKafkaBrokerEventCallback;
}
```

Plus all [Connection Options](./#connection-options).

## Basic Example

```typescript
import { KafkaAdminHelper } from '@venizia/ignis-helpers/kafka';

const helper = KafkaAdminHelper.newInstance({
  bootstrapBrokers: ['localhost:9092'],
  clientId: 'my-admin',
  onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}:${broker.port}`),
  onBrokerDisconnect: ({ broker }) => console.log(`Disconnected from ${broker.host}`),
});

const admin = helper.getAdmin();

// Create a topic
await admin.createTopics({ topics: ['orders'], partitions: 3, replicas: 2 });

// List all topics
const topics = await admin.listTopics();

// Health check
helper.isHealthy(); // true when connected

// Graceful close
await helper.close();

// Or force close
await helper.close({ isForce: true });
```

## Graceful Shutdown

`close()` uses the base `closeClient()` (which calls `this.client.close()`) with a graceful timeout. If the graceful close exceeds `shutdownTimeout` (default 30s), it automatically force-closes. After `close()`, `healthStatus` is set to `'disconnected'`.

```typescript
// Graceful (recommended)
await helper.close();

// Force
await helper.close({ isForce: true });
```

## API Reference (`@platformatic/kafka`)

After calling `helper.getAdmin()`, you have full access to the `Admin` class.

### Topic Management

| Method | Signature | Description |
|--------|-----------|-------------|
| `createTopics(opts)` | `(opts: { topics: string[], partitions?: number, replicas?: number, configs?: Config[] }): Promise<CreatedTopic[]>` | Create topics |
| `deleteTopics(opts)` | `(opts: { topics: string[] }): Promise<void>` | Delete topics |
| `listTopics(opts?)` | `(opts?: { includeInternals?: boolean }): Promise<string[]>` | List all topics |
| `createPartitions(opts)` | `(opts: { topics: CreatePartitionsRequestTopic[], validateOnly?: boolean }): Promise<void>` | Add partitions to existing topics |
| `deleteRecords(opts)` | `(opts: { topics: { name, partitions: { partition, offset }[] }[] }): Promise<DeletedRecordsTopic[]>` | Delete records up to offset |

```typescript
// Create with custom configuration
await admin.createTopics({
  topics: ['orders'],
  partitions: 6,
  replicas: 3,
  configs: [
    { name: 'retention.ms', value: '604800000' },   // 7 days
    { name: 'cleanup.policy', value: 'compact' },
    { name: 'compression.type', value: 'zstd' },
  ],
});

// Add partitions (can only increase, never decrease)
await admin.createPartitions({
  topics: [{ name: 'orders', count: 12 }],
});

// Delete records before offset 1000 on partition 0
await admin.deleteRecords({
  topics: [{ name: 'orders', partitions: [{ partition: 0, offset: 1000n }] }],
});
```

### Consumer Group Management

| Method | Signature | Description |
|--------|-----------|-------------|
| `listGroups(opts?)` | `(opts?: { states?: string[], types?: string[] }): Promise<Map<string, GroupBase>>` | List consumer groups |
| `describeGroups(opts)` | `(opts: { groups: string[] }): Promise<Map<string, Group>>` | Describe consumer groups (members, assignments) |
| `deleteGroups(opts)` | `(opts: { groups: string[] }): Promise<void>` | Delete consumer groups |
| `removeMembersFromConsumerGroup(opts)` | `(opts: { groupId, members? }): Promise<void>` | Remove specific members |

```typescript
// List all active groups
const groups = await admin.listGroups({ states: ['STABLE'] });

// Describe group members and partition assignments
const details = await admin.describeGroups({ groups: ['order-processing'] });
for (const [groupId, group] of details) {
  console.log(`Group: ${groupId}, State: ${group.state}`);
  for (const [memberId, member] of group.members) {
    console.log(`  Member: ${member.clientId} (${member.clientHost})`);
  }
}
```

### Offset Management

| Method | Signature | Description |
|--------|-----------|-------------|
| `listOffsets(opts)` | `(opts): Promise<ListedOffsetsTopic[]>` | List partition offsets at timestamps |
| `listConsumerGroupOffsets(opts)` | `(opts: { groups }): Promise<ListConsumerGroupOffsetsGroup[]>` | List committed offsets for groups |
| `alterConsumerGroupOffsets(opts)` | `(opts: { groupId, topics }): Promise<void>` | Reset/alter committed offsets |
| `deleteConsumerGroupOffsets(opts)` | `(opts: { groupId, topics }): Promise<...>` | Delete committed offsets |

```typescript
// Reset consumer group offsets to earliest
await admin.alterConsumerGroupOffsets({
  groupId: 'order-processing',
  topics: [{
    name: 'orders',
    partitionOffsets: [
      { partition: 0, offset: 0n },
      { partition: 1, offset: 0n },
      { partition: 2, offset: 0n },
    ],
  }],
});
```

### Configuration Management

| Method | Signature | Description |
|--------|-----------|-------------|
| `describeConfigs(opts)` | `(opts: { resources, includeSynonyms?, includeDocumentation? }): Promise<ConfigDescription[]>` | Describe broker/topic configurations |
| `alterConfigs(opts)` | `(opts: { resources, validateOnly? }): Promise<void>` | Replace topic/broker configs |
| `incrementalAlterConfigs(opts)` | `(opts: { resources, validateOnly? }): Promise<void>` | Incrementally modify configs |

### ACL Management

| Method | Signature | Description |
|--------|-----------|-------------|
| `createAcls(opts)` | `(opts: { creations: Acl[] }): Promise<void>` | Create access control lists |
| `describeAcls(opts)` | `(opts: { filter: AclFilter }): Promise<DescribeAclsResponseResource[]>` | Describe ACLs |
| `deleteAcls(opts)` | `(opts: { filters: AclFilter[] }): Promise<Acl[]>` | Delete ACLs |

### Quota Management

| Method | Signature | Description |
|--------|-----------|-------------|
| `describeClientQuotas(opts)` | `(opts): Promise<DescribeClientQuotasResponseEntry[]>` | Describe client quotas |
| `alterClientQuotas(opts)` | `(opts): Promise<AlterClientQuotasResponseEntries[]>` | Alter client quotas |

### Log Management

| Method | Signature | Description |
|--------|-----------|-------------|
| `describeLogDirs(opts)` | `(opts: { topics }): Promise<BrokerLogDirDescription[]>` | Describe broker log directories |
