// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. import * as z from 'zod'; import StreamConsumers from 'stream/consumers'; import pmap from '../vendors/p-map'; import { AnonymousCredential, ContainerClient, StoragePipelineOptions, StorageSharedKeyCredential, } from '@azure/storage-blob'; import { Storage, StoreItems } from 'botbuilder-core'; import { ignoreError, isStatusCodeError } from './ignoreError'; import { sanitizeBlobKey } from './sanitizeBlobKey'; import { TokenCredential, isTokenCredential } from 'botbuilder-stdlib/lib/azureCoreHttpCompat'; /** * Optional settings for BlobsStorage */ export interface BlobsStorageOptions { /** * [StoragePipelineOptions](xref:@azure/storage-blob.StoragePipelineOptions) to pass to azure blob * storage client */ storagePipelineOptions?: StoragePipelineOptions; } function isCredentialType(value: any): value is TokenCredential { return ( isTokenCredential(value) || value instanceof StorageSharedKeyCredential || value instanceof AnonymousCredential ); } /** * BlobsStorage provides a [Storage](xref:botbuilder-core.Storage) implementation backed by Azure Blob Storage */ export class BlobsStorage implements Storage { private readonly _containerClient: ContainerClient; private readonly _concurrency = Infinity; private _initializePromise?: Promise; /** * Constructs a BlobsStorage instance. * * @param {string} connectionString Azure Blob Storage connection string * @param {string} containerName Azure Blob Storage container name * @param {BlobsStorageOptions} options Other options for BlobsStorage * @param {string} url Azure Blob Storage container url * @param {StorageSharedKeyCredential | AnonymousCredential | TokenCredential} credential Azure credential to access the resource */ constructor( connectionString: string, containerName: string, options?: BlobsStorageOptions, url = '', credential?: StorageSharedKeyCredential | AnonymousCredential | TokenCredential, ) { if (url != '' && credential != null) { z.object({ url: z.string() }).parse({ url, }); if (typeof credential != 'object' || !isCredentialType(credential)) { throw new ReferenceError('Invalid credential type.'); } this._containerClient = new ContainerClient(url, credential, options?.storagePipelineOptions); // At most one promise at a time to be friendly to local emulator users if (url.trim() === 'UseDevelopmentStorage=true;') { this._concurrency = 1; } } else { z.object({ connectionString: z.string(), containerName: z.string() }).parse({ connectionString, containerName, }); this._containerClient = new ContainerClient( connectionString, containerName, options?.storagePipelineOptions, ); // At most one promise at a time to be friendly to local emulator users if (connectionString.trim() === 'UseDevelopmentStorage=true;') { this._concurrency = 1; } } } // Protects against JSON.stringify cycles private toJSON(): unknown { return { name: 'BlobsStorage' }; } private _initialize(): Promise { if (!this._initializePromise) { this._initializePromise = this._containerClient.createIfNotExists(); } return this._initializePromise; } /** * Loads store items from storage. * * @param {string[]} keys Array of item keys to read * @returns {Promise} The fetched [StoreItems](xref:botbuilder-core.StoreItems) */ async read(keys: string[]): Promise { z.object({ keys: z.array(z.string()) }).parse({ keys }); await this._initialize(); return ( await pmap }>( keys, async (key) => { const result = { key, value: undefined }; const blob = await ignoreError( this._containerClient.getBlobClient(sanitizeBlobKey(key)).download(), isStatusCodeError(404), ); if (!blob) { return result; } const { etag: eTag, readableStreamBody } = blob; if (!readableStreamBody) { return result; } const parsed = (await StreamConsumers.json(readableStreamBody)) as any; result.value = { ...parsed, eTag }; return result; }, { concurrency: this._concurrency, }, ) ).reduce((acc, { key, value }) => (value ? { ...acc, [key]: value } : acc), {}); } /** * Saves store items to storage. * * @param {StoreItems} changes Map of [StoreItems](xref:botbuilder-core.StoreItems) to write to storage * @returns {Promise} A promise representing the async operation */ async write(changes: StoreItems): Promise { z.record(z.unknown()).parse(changes); await this._initialize(); await pmap( Object.entries(changes), async ([key, { eTag = '', ...change }]) => { try { const blob = this._containerClient.getBlockBlobClient(sanitizeBlobKey(key)); const serialized = JSON.stringify(change); return await blob.upload(serialized, serialized.length, { conditions: typeof eTag === 'string' && eTag !== '*' ? { ifMatch: eTag } : {}, blobHTTPHeaders: { blobContentType: 'application/json' }, }); } catch (err: any) { if (err.statusCode === 412) { throw new Error(`Storage: error writing "${key}" due to eTag conflict.`); } else { throw err; } } }, { concurrency: this._concurrency, }, ); } /** * Removes store items from storage. * * @param {string[]} keys Array of item keys to remove from the store * @returns {Promise} A promise representing the async operation */ async delete(keys: string[]): Promise { z.object({ keys: z.array(z.string()) }).parse({ keys }); await this._initialize(); await pmap( keys, (key) => ignoreError(this._containerClient.deleteBlob(sanitizeBlobKey(key)), isStatusCodeError(404)), { concurrency: this._concurrency, }, ); } }