///
import type { Readable } from 'stream';
import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
import { ChangeStreamCursor } from './cursor/change_stream_cursor';
import { Db } from './db';
import { MongoClient } from './mongo_client';
import { InferIdType, TypedEventEmitter } from './mongo_types';
import type { AggregateOptions } from './operations/aggregate';
import type { CollationOptions, OperationParent } from './operations/command';
import type { ReadPreference } from './read_preference';
import type { ServerSessionId } from './sessions';
import { Callback, MongoDBNamespace } from './utils';
/** @internal */
declare const kCursorStream: unique symbol;
/** @internal */
declare const kClosed: unique symbol;
/** @internal */
declare const kMode: unique symbol;
/**
* @public
* @deprecated Please use the ChangeStreamCursorOptions type instead.
*/
export interface ResumeOptions {
startAtOperationTime?: Timestamp;
batchSize?: number;
maxAwaitTimeMS?: number;
collation?: CollationOptions;
readPreference?: ReadPreference;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
fullDocument?: string;
}
/**
* Represents the logical starting point for a new ChangeStream or resuming a ChangeStream on the server.
* @see https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume
* @public
*/
export declare type ResumeToken = unknown;
/**
* Represents a specific point in time on a server. Can be retrieved by using `db.command()`
* @public
* @see https://docs.mongodb.com/manual/reference/method/db.runCommand/#response
*/
export declare type OperationTime = Timestamp;
/** @public */
export interface PipeOptions {
end?: boolean;
}
/**
* Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
* @public
*/
export interface ChangeStreamOptions extends AggregateOptions {
/**
* Allowed values: 'updateLookup', 'whenAvailable', 'required'.
*
* When set to 'updateLookup', the change notification for partial updates
* will include both a delta describing the changes to the document as well
* as a copy of the entire document that was changed from some time after
* the change occurred.
*
* When set to 'whenAvailable', configures the change stream to return the
* post-image of the modified document for replace and update change events
* if the post-image for this event is available.
*
* When set to 'required', the same behavior as 'whenAvailable' except that
* an error is raised if the post-image is not available.
*/
fullDocument?: string;
/**
* Allowed values: 'whenAvailable', 'required', 'off'.
*
* The default is to not send a value, which is equivalent to 'off'.
*
* When set to 'whenAvailable', configures the change stream to return the
* pre-image of the modified document for replace, update, and delete change
* events if it is available.
*
* When set to 'required', the same behavior as 'whenAvailable' except that
* an error is raised if the pre-image is not available.
*/
fullDocumentBeforeChange?: string;
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
maxAwaitTimeMS?: number;
/**
* Allows you to start a changeStream after a specified event.
* @see https://docs.mongodb.com/manual/changeStreams/#resumeafter-for-change-streams
*/
resumeAfter?: ResumeToken;
/**
* Similar to resumeAfter, but will allow you to start after an invalidated event.
* @see https://docs.mongodb.com/manual/changeStreams/#startafter-for-change-streams
*/
startAfter?: ResumeToken;
/** Will start the changeStream after the specified operationTime. */
startAtOperationTime?: OperationTime;
/**
* The number of documents to return per batch.
* @see https://docs.mongodb.com/manual/reference/command/aggregate
*/
batchSize?: number;
/**
* When enabled, configures the change stream to include extra change events.
*
* - createIndexes
* - dropIndexes
* - modify
* - create
* - shardCollection
* - reshardCollection
* - refineCollectionShardKey
*/
showExpandedEvents?: boolean;
}
/** @public */
export interface ChangeStreamNameSpace {
db: string;
coll: string;
}
/** @public */
export interface ChangeStreamDocumentKey {
/**
* For unsharded collections this contains a single field `_id`.
* For sharded collections, this will contain all the components of the shard key
*/
documentKey: {
_id: InferIdType;
[shardKey: string]: any;
};
}
/** @public */
export interface ChangeStreamDocumentCommon {
/**
* The id functions as an opaque token for use when resuming an interrupted
* change stream.
*/
_id: ResumeToken;
/**
* The timestamp from the oplog entry associated with the event.
* For events that happened as part of a multi-document transaction, the associated change stream
* notifications will have the same clusterTime value, namely the time when the transaction was committed.
* On a sharded cluster, events that occur on different shards can have the same clusterTime but be
* associated with different transactions or even not be associated with any transaction.
* To identify events for a single transaction, you can use the combination of lsid and txnNumber in the change stream event document.
*/
clusterTime?: Timestamp;
/**
* The transaction number.
* Only present if the operation is part of a multi-document transaction.
*
* **NOTE:** txnNumber can be a Long if promoteLongs is set to false
*/
txnNumber?: number;
/**
* The identifier for the session associated with the transaction.
* Only present if the operation is part of a multi-document transaction.
*/
lsid?: ServerSessionId;
}
/** @public */
export interface ChangeStreamDocumentCollectionUUID {
/**
* The UUID (Binary subtype 4) of the collection that the operation was performed on.
*
* Only present when the `showExpandedEvents` flag is enabled.
*
* **NOTE:** collectionUUID will be converted to a NodeJS Buffer if the promoteBuffers
* flag is enabled.
*
* @since 6.1.0
*/
collectionUUID: Binary;
}
/** @public */
export interface ChangeStreamDocumentOperationDescription {
/**
* An description of the operation.
*
* Only present when the `showExpandedEvents` flag is enabled.
*
* @since 6.1.0
*/
operationDescription?: Document;
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#insert-event
*/
export interface ChangeStreamInsertDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentKey, ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'insert';
/** This key will contain the document being inserted */
fullDocument: TSchema;
/** Namespace the insert event occured on */
ns: ChangeStreamNameSpace;
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#update-event
*/
export interface ChangeStreamUpdateDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentKey, ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'update';
/**
* This is only set if `fullDocument` is set to `'updateLookup'`
* Contains the point-in-time post-image of the modified document if the
* post-image is available and either 'required' or 'whenAvailable' was
* specified for the 'fullDocument' option when creating the change stream.
*/
fullDocument?: TSchema;
/** Contains a description of updated and removed fields in this operation */
updateDescription: UpdateDescription;
/** Namespace the update event occured on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
* pre-image is available for the change event and either 'required' or
* 'whenAvailable' was specified for the 'fullDocumentBeforeChange' option
* when creating the change stream. If 'whenAvailable' was specified but the
* pre-image is unavailable, this will be explicitly set to null.
*/
fullDocumentBeforeChange?: TSchema;
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#replace-event
*/
export interface ChangeStreamReplaceDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentKey {
/** Describes the type of operation represented in this change notification */
operationType: 'replace';
/** The fullDocument of a replace event represents the document after the insert of the replacement document */
fullDocument: TSchema;
/** Namespace the replace event occured on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
* pre-image is available for the change event and either 'required' or
* 'whenAvailable' was specified for the 'fullDocumentBeforeChange' option
* when creating the change stream. If 'whenAvailable' was specified but the
* pre-image is unavailable, this will be explicitly set to null.
*/
fullDocumentBeforeChange?: TSchema;
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#delete-event
*/
export interface ChangeStreamDeleteDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentKey, ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'delete';
/** Namespace the delete event occured on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
* pre-image is available for the change event and either 'required' or
* 'whenAvailable' was specified for the 'fullDocumentBeforeChange' option
* when creating the change stream. If 'whenAvailable' was specified but the
* pre-image is unavailable, this will be explicitly set to null.
*/
fullDocumentBeforeChange?: TSchema;
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#drop-event
*/
export interface ChangeStreamDropDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'drop';
/** Namespace the drop event occured on */
ns: ChangeStreamNameSpace;
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#rename-event
*/
export interface ChangeStreamRenameDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'rename';
/** The new name for the `ns.coll` collection */
to: {
db: string;
coll: string;
};
/** The "from" namespace that the rename occured on */
ns: ChangeStreamNameSpace;
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event
*/
export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCommon {
/** Describes the type of operation represented in this change notification */
operationType: 'dropDatabase';
/** The database dropped */
ns: {
db: string;
};
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event
*/
export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentCommon {
/** Describes the type of operation represented in this change notification */
operationType: 'invalidate';
}
/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCreateIndexDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID, ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'createIndexes';
}
/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamDropIndexDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID, ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'dropIndexes';
}
/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCollModDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'modify';
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCreateDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'create';
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamShardCollectionDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID, ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'shardCollection';
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamReshardCollectionDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID, ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'reshardCollection';
}
/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamRefineCollectionShardKeyDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID, ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'refineCollectionShardKey';
}
/** @public */
export declare type ChangeStreamDocument = ChangeStreamInsertDocument | ChangeStreamUpdateDocument | ChangeStreamReplaceDocument | ChangeStreamDeleteDocument | ChangeStreamDropDocument | ChangeStreamRenameDocument | ChangeStreamDropDatabaseDocument | ChangeStreamInvalidateDocument | ChangeStreamCreateIndexDocument | ChangeStreamCreateDocument | ChangeStreamCollModDocument | ChangeStreamDropIndexDocument | ChangeStreamShardCollectionDocument | ChangeStreamReshardCollectionDocument | ChangeStreamRefineCollectionShardKeyDocument;
/** @public */
export interface UpdateDescription {
/**
* A document containing key:value pairs of names of the fields that were
* changed, and the new value for those fields.
*/
updatedFields?: Partial;
/**
* An array of field names that were removed from the document.
*/
removedFields?: string[];
/**
* An array of documents which record array truncations performed with pipeline-based updates using one or more of the following stages:
* - $addFields
* - $set
* - $replaceRoot
* - $replaceWith
*/
truncatedArrays?: Array<{
/** The name of the truncated field. */
field: string;
/** The number of elements in the truncated array. */
newSize: number;
}>;
}
/** @public */
export declare type ChangeStreamEvents> = {
resumeTokenChanged(token: ResumeToken): void;
init(response: any): void;
more(response?: any): void;
response(): void;
end(): void;
error(error: Error): void;
change(change: TChange): void;
} & AbstractCursorEvents;
/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
*/
export declare class ChangeStream> extends TypedEventEmitter> {
pipeline: Document[];
options: ChangeStreamOptions;
parent: MongoClient | Db | Collection;
namespace: MongoDBNamespace;
type: symbol;
/** @internal */
cursor: ChangeStreamCursor;
streamOptions?: CursorStreamOptions;
/** @internal */
[kCursorStream]?: Readable & AsyncIterable;
/** @internal */
[kClosed]: boolean;
/** @internal */
[kMode]: false | 'iterator' | 'emitter';
/** @event */
static readonly RESPONSE: "response";
/** @event */
static readonly MORE: "more";
/** @event */
static readonly INIT: "init";
/** @event */
static readonly CLOSE: "close";
/**
* Fired for each new matching change in the specified namespace. Attaching a `change`
* event listener to a Change Stream will switch the stream into flowing mode. Data will
* then be passed as soon as it is available.
* @event
*/
static readonly CHANGE: "change";
/** @event */
static readonly END: "end";
/** @event */
static readonly ERROR: "error";
/**
* Emitted each time the change stream stores a new resume token.
* @event
*/
static readonly RESUME_TOKEN_CHANGED: "resumeTokenChanged";
/**
* @internal
*
* @param parent - The parent object that created this change stream
* @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
*/
constructor(parent: OperationParent, pipeline?: Document[], options?: ChangeStreamOptions);
/** @internal */
get cursorStream(): (Readable & AsyncIterable) | undefined;
/** The cached resume token that is used to resume after the most recently returned change. */
get resumeToken(): ResumeToken;
/** Check if there is any document still available in the Change Stream */
hasNext(): Promise;
hasNext(callback: Callback): void;
/** Get the next available document from the Change Stream. */
next(): Promise;
next(callback: Callback): void;
/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise;
tryNext(callback: Callback): void;
/** Is the cursor closed */
get closed(): boolean;
/** Close the Change Stream */
close(): Promise;
close(callback: Callback): void;
/**
* Return a modified Readable stream including a possible transform method.
*
* NOTE: When using a Stream to process change stream events, the stream will
* NOT automatically resume in the case a resumable error is encountered.
*
* @throws MongoChangeStreamError if the underlying cursor or the change stream is closed
*/
stream(options?: CursorStreamOptions): Readable & AsyncIterable;
/** @internal */
private _setIsEmitter;
/** @internal */
private _setIsIterator;
/**
* Create a new change stream cursor based on self's configuration
* @internal
*/
private _createChangeStreamCursor;
/** @internal */
private _closeEmitterModeWithError;
/** @internal */
private _streamEvents;
/** @internal */
private _endStream;
/** @internal */
private _processChange;
/** @internal */
private _processErrorStreamMode;
/**
* @internal
*
* TODO(NODE-4320): promisify selectServer and refactor this code to be async
*
* we promisify _processErrorIteratorModeCallback until we have a promisifed version of selectServer.
*/
private _processErrorIteratorMode;
/** @internal */
private _processErrorIteratorModeCallback;
}
export {};
//# sourceMappingURL=change_stream.d.ts.map