///
import Denque = require('denque');
import type { Readable } from 'stream';
import type { Document, Timestamp } from './bson';
import { Collection } from './collection';
import { AbstractCursor, AbstractCursorEvents, AbstractCursorOptions, CursorStreamOptions } from './cursor/abstract_cursor';
import { Db } from './db';
import { MongoClient } from './mongo_client';
import { InferIdType, TypedEventEmitter } from './mongo_types';
import { AggregateOptions } from './operations/aggregate';
import type { CollationOptions, OperationParent } from './operations/command';
import { ExecutionResult } from './operations/execute_operation';
import type { ReadPreference } from './read_preference';
import type { Topology } from './sdam/topology';
import type { ClientSession } from './sessions';
import { Callback, MongoDBNamespace } from './utils';
/** @internal */
declare const kResumeQueue: unique symbol;
/** @internal */
declare const kCursorStream: unique symbol;
/** @internal */
declare const kClosed: unique symbol;
/** @internal */
declare const kMode: unique symbol;
/** @public */
export interface ResumeOptions {
startAtOperationTime?: Timestamp;
batchSize?: number;
maxAwaitTimeMS?: number;
collation?: CollationOptions;
readPreference?: ReadPreference;
}
/**
* Represents the logical starting point for a new or resuming {@link https://docs.mongodb.com/manual/changeStreams/#std-label-change-stream-resume| Change Stream} on the server.
* @public
*/
export declare type ResumeToken = unknown;
/**
* Represents a specific point in time on a server. Can be retrieved by using {@link Db#command}
* @public
* @remarks
* See {@link https://docs.mongodb.com/manual/reference/method/db.runCommand/#response| Run Command Response}
*/
export declare type OperationTime = Timestamp;
/** @public */
export interface PipeOptions {
end?: boolean;
}
/**
* Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
* @public
*/
export interface ChangeStreamOptions extends AggregateOptions {
/** Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
fullDocument?: string;
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
maxAwaitTimeMS?: number;
/** Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/manual/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}. */
resumeAfter?: ResumeToken;
/** Similar to resumeAfter, but will allow you to start after an invalidated event. See {@link https://docs.mongodb.com/manual/changeStreams/#startafter-for-change-streams|ChangeStream documentation}. */
startAfter?: ResumeToken;
/** Will start the changeStream after the specified operationTime. */
startAtOperationTime?: OperationTime;
/** The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. */
batchSize?: number;
}
/** @public */
export interface ChangeStreamDocument {
/**
* The id functions as an opaque token for use when resuming an interrupted
* change stream.
*/
_id: InferIdType;
/**
* Describes the type of operation represented in this change notification.
*/
operationType: 'insert' | 'update' | 'replace' | 'delete' | 'invalidate' | 'drop' | 'dropDatabase' | 'rename';
/**
* Contains two fields: “db” and “coll” containing the database and
* collection name in which the change happened.
*/
ns: {
db: string;
coll: string;
};
/**
* Only present for ops of type ‘insert’, ‘update’, ‘replace’, and
* ‘delete’.
*
* For unsharded collections this contains a single field, _id, with the
* value of the _id of the document updated. For sharded collections,
* this will contain all the components of the shard key in order,
* followed by the _id if the _id isn’t part of the shard key.
*/
documentKey?: {
_id: InferIdType;
};
/**
* Only present for ops of type ‘update’.
*
* Contains a description of updated and removed fields in this
* operation.
*/
updateDescription?: UpdateDescription;
/**
* Always present for operations of type ‘insert’ and ‘replace’. Also
* present for operations of type ‘update’ if the user has specified ‘updateLookup’
* in the ‘fullDocument’ arguments to the ‘$changeStream’ stage.
*
* For operations of type ‘insert’ and ‘replace’, this key will contain the
* document being inserted, or the new version of the document that is replacing
* the existing document, respectively.
*
* For operations of type ‘update’, this key will contain a copy of the full
* version of the document from some point after the update occurred. If the
* document was deleted since the updated happened, it will be null.
*/
fullDocument?: TSchema;
}
/** @public */
export interface UpdateDescription {
/**
* A document containing key:value pairs of names of the fields that were
* changed, and the new value for those fields.
*/
updatedFields: Partial;
/**
* An array of field names that were removed from the document.
*/
removedFields: string[];
}
/** @public */
export declare type ChangeStreamEvents = {
resumeTokenChanged(token: ResumeToken): void;
init(response: TSchema): void;
more(response?: TSchema | undefined): void;
response(): void;
end(): void;
error(error: Error): void;
change(change: ChangeStreamDocument): void;
} & AbstractCursorEvents;
/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
*/
export declare class ChangeStream extends TypedEventEmitter> {
pipeline: Document[];
options: ChangeStreamOptions;
parent: MongoClient | Db | Collection;
namespace: MongoDBNamespace;
type: symbol;
/** @internal */
cursor?: ChangeStreamCursor;
streamOptions?: CursorStreamOptions;
/** @internal */
[kResumeQueue]: Denque>>;
/** @internal */
[kCursorStream]?: Readable;
/** @internal */
[kClosed]: boolean;
/** @internal */
[kMode]: false | 'iterator' | 'emitter';
/** @event */
static readonly RESPONSE: "response";
/** @event */
static readonly MORE: "more";
/** @event */
static readonly INIT: "init";
/** @event */
static readonly CLOSE: "close";
/**
* Fired for each new matching change in the specified namespace. Attaching a `change`
* event listener to a Change Stream will switch the stream into flowing mode. Data will
* then be passed as soon as it is available.
* @event
*/
static readonly CHANGE: "change";
/** @event */
static readonly END: "end";
/** @event */
static readonly ERROR: "error";
/**
* Emitted each time the change stream stores a new resume token.
* @event
*/
static readonly RESUME_TOKEN_CHANGED: "resumeTokenChanged";
/**
* @internal
*
* @param parent - The parent object that created this change stream
* @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
*/
constructor(parent: OperationParent, pipeline?: Document[], options?: ChangeStreamOptions);
/** @internal */
get cursorStream(): Readable | undefined;
/** The cached resume token that is used to resume after the most recently returned change. */
get resumeToken(): ResumeToken;
/** Check if there is any document still available in the Change Stream */
hasNext(): Promise;
hasNext(callback: Callback): void;
/** Get the next available document from the Change Stream. */
next(): Promise>;
next(callback: Callback>): void;
/** Is the cursor closed */
get closed(): boolean;
/** Close the Change Stream */
close(callback?: Callback): Promise | void;
/**
* Return a modified Readable stream including a possible transform method.
* @throws MongoDriverError if this.cursor is undefined
*/
stream(options?: CursorStreamOptions): Readable;
/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise;
tryNext(callback: Callback): void;
}
/** @internal */
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: boolean;
}
/** @internal */
export declare class ChangeStreamCursor extends AbstractCursor, ChangeStreamEvents> {
_resumeToken: ResumeToken;
startAtOperationTime?: OperationTime;
hasReceived?: boolean;
resumeAfter: ResumeToken;
startAfter: ResumeToken;
options: ChangeStreamCursorOptions;
postBatchResumeToken?: ResumeToken;
pipeline: Document[];
constructor(topology: Topology, namespace: MongoDBNamespace, pipeline?: Document[], options?: ChangeStreamCursorOptions);
set resumeToken(token: ResumeToken);
get resumeToken(): ResumeToken;
get resumeOptions(): ResumeOptions;
cacheResumeToken(resumeToken: ResumeToken): void;
_processBatch(batchName: string, response?: Document): void;
clone(): AbstractCursor>;
_initialize(session: ClientSession, callback: Callback): void;
_getMore(batchSize: number, callback: Callback): void;
}
export {};
//# sourceMappingURL=change_stream.d.ts.map