/*! * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. */ /// import { EventEmitter } from 'events'; import { InterceptorProcessor } from '../interceptor'; import { ActiveTask } from '../task'; import { RecursiveRequired } from '../types/internal'; import TaskClient from './client'; import CosmosDbClient from './cosmos'; import { ListenOptions, TaskHandler } from './types'; export default class ListenerImpl extends EventEmitter implements Listener { get activeTaskCount(): number; get running(): boolean; /** * Number of additional tasks that can be processed right now on top of our * current load. */ private get _capacity(); /** * Task client. */ private _taskClient; /** * Cosmos DB client. */ private _client; /** * Interceptor for passing into tasks. */ private _interceptor; /** * Handler provided by the user for processing tasks. */ private _handler; /** * Type of tasks to listen for */ private _type; /** * Options for configuring the listener provided by the user. */ private _options; /** * Map of tasks that are actively being processed by their task ids */ private _activeTasks; /** * If true, indicates that the task processor will continue to pick up new * tasks for processing as they become available. */ private _running; /** * True if we're actively looking for new tasks to process. */ private _polling; /** * The number of consecutive times that we have failed to poll for new tasks */ private _consecutivePollFailures; /** * If true, the listener is torn down and can no longer be used. */ private _destroyed; /** * This will be defined if there is a timer set to attempt to fetch more * tasks in the future. Also doubles as the timer ref to cancel if * necessary. */ private _pollTimer?; constructor(taskClient: TaskClient, client: CosmosDbClient, interceptor: InterceptorProcessor, type: string, handler: TaskHandler, options: RecursiveRequired); start(): void; stop(): void; destroy(): Promise; /** * Main polling loop. Attempts to lock tasks and passes them off to the * handler for processing, scheduling the next poll depending on the tasks * returned and available capacity. */ private _poll; /** * Attempts to lock up to the specified number of tasks in the database, * returning the tasks it was able to lock along with a flag indicating * whether they may be more tasks on the server available to lock. * * @param maxCount Maximum number of tasks we're willing to lock in this * call */ private _lockTasks; /** * Handler for listening to a task. * * @param task Task instance to process */ private _handle; /** * Schedules a future attempt to poll for tasks to lock, ensuring that there * are no other tasks going on. The provided delay is adjusted by a small * fuzzy factor to help avoid undue races between clients. * * @param delayMs Amount of time to wait in milliseconds before the next * polling attempt. */ private _schedulePoll; /** * Checks and executes the poll if it isn't already running. Used when there * is a trigger that indicates that we may have more capacity to process new * tasks. */ private _pollNow; /** * Clear out the current poll timer, if one is present */ private _clearPoll; /** * Forcefully release all tasks from processing. This is only used when * destroying the listener. */ private _releaseAllTasks; /** * Run the completion function on the task, waiting for it to enter an * appropriate state if needed. If it is already finished, then do nothing. * * @param task The task to wait for */ private _finish; } /** * Listener for processing tasks of a single type. It is generally created by * calling {@link TaskClient.listen}. * * @public */ export interface Listener extends EventEmitter { /** * Number indicating the total count of active tasks being processed by the * listener. Useful if you want to check processor load or if the processor * is idling. * * @public */ readonly activeTaskCount: number; /** * Boolean indicating whether the listener is actively running or not. * * @public */ readonly running: boolean; /** * Start processing tasks from the listener. Useful if you want to pause and * resume the task processing using some custom logic. Does nothing if the * subsciprtion is already running. * * @public */ start(): void; /** * Stop the listener from processing new tasks, but allow any currently * running tasks to complete if desired. Does nothing if the subscription is * already stopped. * * @public */ stop(): void; /** * Stop processing tasks from the listener AND terminate processing of any * tasks that are currently in-flight by releasing them. Returns a promise * which resolves once all tasks have been released. Unlike {@link * Listener.start} and {@link Listener.stop}, this is _not_ idempotent. * Calls to destroy an already destroyed client will result in an error to * avoid race conditions during shutdown. * * @public */ destroy(): Promise; on(event: 'polled', listener: () => void): this; on(event: 'pollingStuck', listener: (err: any) => void): this; on(event: 'finishedTask', listener: (task: ActiveTask) => void): this; } //# sourceMappingURL=listener.d.ts.map