/// import Denque = require('denque'); import type { Readable } from 'stream'; import type { Document, Timestamp } from './bson'; import { Collection } from './collection'; import { AbstractCursor, AbstractCursorEvents, AbstractCursorOptions, CursorStreamOptions } from './cursor/abstract_cursor'; import { Db } from './db'; import { MongoClient } from './mongo_client'; import { InferIdType, TypedEventEmitter } from './mongo_types'; import { AggregateOptions } from './operations/aggregate'; import type { CollationOptions, OperationParent } from './operations/command'; import { ExecutionResult } from './operations/execute_operation'; import type { ReadPreference } from './read_preference'; import type { Topology } from './sdam/topology'; import type { ClientSession } from './sessions'; import { Callback, MongoDBNamespace } from './utils'; /** @internal */ declare const kResumeQueue: unique symbol; /** @internal */ declare const kCursorStream: unique symbol; /** @internal */ declare const kClosed: unique symbol; /** @internal */ declare const kMode: unique symbol; /** @public */ export interface ResumeOptions { startAtOperationTime?: Timestamp; batchSize?: number; maxAwaitTimeMS?: number; collation?: CollationOptions; readPreference?: ReadPreference; } /** * Represents the logical starting point for a new or resuming {@link https://docs.mongodb.com/manual/changeStreams/#std-label-change-stream-resume| Change Stream} on the server. * @public */ export declare type ResumeToken = unknown; /** * Represents a specific point in time on a server. Can be retrieved by using {@link Db#command} * @public * @remarks * See {@link https://docs.mongodb.com/manual/reference/method/db.runCommand/#response| Run Command Response} */ export declare type OperationTime = Timestamp; /** @public */ export interface PipeOptions { end?: boolean; } /** * Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified. * @public */ export interface ChangeStreamOptions extends AggregateOptions { /** Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */ fullDocument?: string; /** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */ maxAwaitTimeMS?: number; /** Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/manual/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}. */ resumeAfter?: ResumeToken; /** Similar to resumeAfter, but will allow you to start after an invalidated event. See {@link https://docs.mongodb.com/manual/changeStreams/#startafter-for-change-streams|ChangeStream documentation}. */ startAfter?: ResumeToken; /** Will start the changeStream after the specified operationTime. */ startAtOperationTime?: OperationTime; /** The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. */ batchSize?: number; } /** @public */ export interface ChangeStreamDocument { /** * The id functions as an opaque token for use when resuming an interrupted * change stream. */ _id: InferIdType; /** * Describes the type of operation represented in this change notification. */ operationType: 'insert' | 'update' | 'replace' | 'delete' | 'invalidate' | 'drop' | 'dropDatabase' | 'rename'; /** * Contains two fields: “db” and “coll” containing the database and * collection name in which the change happened. */ ns: { db: string; coll: string; }; /** * Only present for ops of type ‘insert’, ‘update’, ‘replace’, and * ‘delete’. * * For unsharded collections this contains a single field, _id, with the * value of the _id of the document updated. For sharded collections, * this will contain all the components of the shard key in order, * followed by the _id if the _id isn’t part of the shard key. */ documentKey?: { _id: InferIdType; }; /** * Only present for ops of type ‘update’. * * Contains a description of updated and removed fields in this * operation. */ updateDescription?: UpdateDescription; /** * Always present for operations of type ‘insert’ and ‘replace’. Also * present for operations of type ‘update’ if the user has specified ‘updateLookup’ * in the ‘fullDocument’ arguments to the ‘$changeStream’ stage. * * For operations of type ‘insert’ and ‘replace’, this key will contain the * document being inserted, or the new version of the document that is replacing * the existing document, respectively. * * For operations of type ‘update’, this key will contain a copy of the full * version of the document from some point after the update occurred. If the * document was deleted since the updated happened, it will be null. */ fullDocument?: TSchema; } /** @public */ export interface UpdateDescription { /** * A document containing key:value pairs of names of the fields that were * changed, and the new value for those fields. */ updatedFields: Partial; /** * An array of field names that were removed from the document. */ removedFields: string[]; } /** @public */ export declare type ChangeStreamEvents = { resumeTokenChanged(token: ResumeToken): void; init(response: TSchema): void; more(response?: TSchema | undefined): void; response(): void; end(): void; error(error: Error): void; change(change: ChangeStreamDocument): void; } & AbstractCursorEvents; /** * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. * @public */ export declare class ChangeStream extends TypedEventEmitter> { pipeline: Document[]; options: ChangeStreamOptions; parent: MongoClient | Db | Collection; namespace: MongoDBNamespace; type: symbol; /** @internal */ cursor?: ChangeStreamCursor; streamOptions?: CursorStreamOptions; /** @internal */ [kResumeQueue]: Denque>>; /** @internal */ [kCursorStream]?: Readable; /** @internal */ [kClosed]: boolean; /** @internal */ [kMode]: false | 'iterator' | 'emitter'; /** @event */ static readonly RESPONSE: "response"; /** @event */ static readonly MORE: "more"; /** @event */ static readonly INIT: "init"; /** @event */ static readonly CLOSE: "close"; /** * Fired for each new matching change in the specified namespace. Attaching a `change` * event listener to a Change Stream will switch the stream into flowing mode. Data will * then be passed as soon as it is available. * @event */ static readonly CHANGE: "change"; /** @event */ static readonly END: "end"; /** @event */ static readonly ERROR: "error"; /** * Emitted each time the change stream stores a new resume token. * @event */ static readonly RESUME_TOKEN_CHANGED: "resumeTokenChanged"; /** * @internal * * @param parent - The parent object that created this change stream * @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents */ constructor(parent: OperationParent, pipeline?: Document[], options?: ChangeStreamOptions); /** @internal */ get cursorStream(): Readable | undefined; /** The cached resume token that is used to resume after the most recently returned change. */ get resumeToken(): ResumeToken; /** Check if there is any document still available in the Change Stream */ hasNext(): Promise; hasNext(callback: Callback): void; /** Get the next available document from the Change Stream. */ next(): Promise>; next(callback: Callback>): void; /** Is the cursor closed */ get closed(): boolean; /** Close the Change Stream */ close(callback?: Callback): Promise | void; /** * Return a modified Readable stream including a possible transform method. * @throws MongoDriverError if this.cursor is undefined */ stream(options?: CursorStreamOptions): Readable; /** * Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned */ tryNext(): Promise; tryNext(callback: Callback): void; } /** @internal */ export interface ChangeStreamCursorOptions extends AbstractCursorOptions { startAtOperationTime?: OperationTime; resumeAfter?: ResumeToken; startAfter?: boolean; } /** @internal */ export declare class ChangeStreamCursor extends AbstractCursor, ChangeStreamEvents> { _resumeToken: ResumeToken; startAtOperationTime?: OperationTime; hasReceived?: boolean; resumeAfter: ResumeToken; startAfter: ResumeToken; options: ChangeStreamCursorOptions; postBatchResumeToken?: ResumeToken; pipeline: Document[]; constructor(topology: Topology, namespace: MongoDBNamespace, pipeline?: Document[], options?: ChangeStreamCursorOptions); set resumeToken(token: ResumeToken); get resumeToken(): ResumeToken; get resumeOptions(): ResumeOptions; cacheResumeToken(resumeToken: ResumeToken): void; _processBatch(batchName: string, response?: Document): void; clone(): AbstractCursor>; _initialize(session: ClientSession, callback: Callback): void; _getMore(batchSize: number, callback: Callback): void; } export {}; //# sourceMappingURL=change_stream.d.ts.map