/** * @file Manages query for records in Salesforce * @author Shinichi Tomita */ import { EventEmitter } from 'events'; import { Logger, getLogger } from './util/logger'; import RecordStream, { Serializable } from './record-stream'; import Connection from './connection'; import { createSOQL } from './soql-builder'; import { QueryConfig as SOQLQueryConfig, SortDir } from './soql-builder'; import { Record, Optional, Schema, SObjectNames, ChildRelationshipNames, ChildRelationshipSObjectName, FieldProjectionConfig, FieldPathSpecifier, FieldPathScopedProjection, SObjectRecord, SObjectInputRecord, SObjectUpdateRecord, SaveResult, DateString, SObjectChildRelationshipProp, SObjectFieldNames, } from './types'; import { Readable } from 'stream'; import SfDate from './date'; import { IngestJobV2Results } from './api/bulk2'; /** * */ export type QueryField< S extends Schema, N extends SObjectNames, FP extends FieldPathSpecifier = FieldPathSpecifier > = FP | FP[] | string | string[] | { [field: string]: number | boolean }; /** * */ type CValue = T extends DateString ? SfDate : T extends string | number | boolean ? T : never; type CondOp = | ['$eq', CValue | null] | ['$ne', CValue | null] | ['$gt', CValue] | ['$gte', CValue] | ['$lt', CValue] | ['$lte', CValue] | ['$like', T extends string ? T : never] | ['$nlike', T extends string ? T : never] | ['$in', Array>] | ['$nin', Array>] | ['$includes', T extends string ? T[] : never] | ['$excludes', T extends string ? T[] : never] | ['$exists', boolean]; type CondValueObj[0]> = Op extends CondOp[0] ? Op extends string ? { [K in Op]: Extract, [Op, any]>[1] } : never : never; type CondValue = CValue | Array> | null | CondValueObj; type ConditionSet = { [K in keyof R]?: CondValue; }; export type QueryCondition> = | { $or: Array>; } | { $and: Array>; } | ConditionSet>; export type QuerySort< S extends Schema, N extends SObjectNames, R extends SObjectRecord = SObjectRecord > = | { [K in keyof R]?: SortDir; } | Array<[keyof R, SortDir]>; /** * */ export type QueryConfig< S extends Schema, N extends SObjectNames, FP extends FieldPathSpecifier = FieldPathSpecifier > = { fields?: QueryField; includes?: { [CRN in ChildRelationshipNames]?: QueryConfig< S, ChildRelationshipSObjectName >; }; table?: string; conditions?: QueryCondition; sort?: QuerySort; limit?: number; offset?: number; }; export type QueryOptions = { headers: { [name: string]: string }; maxFetch: number; autoFetch: boolean; scanAll: boolean; responseTarget: QueryResponseTarget; }; export type QueryResult = { done: boolean; totalSize: number; records: R[]; nextRecordsUrl?: string; }; export type QueryExplainResult = { plans: Array<{ cardinality: number; fields: string[]; leadingOperationType: 'Index' | 'Other' | 'Sharing' | 'TableScan'; notes: Array<{ description: string; fields: string[]; tableEnumOrId: string; }>; relativeCost: number; sobjectCardinality: number; sobjectType: string; }>; }; const ResponseTargetValues = [ 'QueryResult', 'Records', 'SingleRecord', 'Count', ] as const; export type QueryResponseTarget = typeof ResponseTargetValues[number]; export const ResponseTargets: { [K in QueryResponseTarget]: K; } = ResponseTargetValues.reduce( (values, target) => ({ ...values, [target]: target }), {} as { [K in QueryResponseTarget]: K; }, ); export type QueryResponse< R extends Record, QRT extends QueryResponseTarget = QueryResponseTarget > = QRT extends 'QueryResult' ? QueryResult : QRT extends 'Records' ? R[] : QRT extends 'SingleRecord' ? R | null : number; // QRT extends 'Count' export type BulkApiVersion = 1 | 2; export type QueryDestroyOptions = { allowBulk?: boolean; bulkThreshold?: number; bulkApiVersion?: BulkApiVersion; }; export type QueryUpdateOptions = { allowBulk?: boolean; bulkThreshold?: number; bulkApiVersion?: BulkApiVersion; /** * Skip record template evaluation. */ skipRecordTemplateEval?: boolean; }; /** * */ const DEFAULT_BULK_THRESHOLD = 200; const DEFAULT_BULK_API_VERSION = 1; /** * Query */ export class Query< S extends Schema, N extends SObjectNames, R extends Record = Record, QRT extends QueryResponseTarget = QueryResponseTarget > extends EventEmitter { static _logger = getLogger('query'); _conn: Connection; _logger: Logger; _soql: Optional; _locator: Optional; _config: SOQLQueryConfig = {}; _children: Array> = []; _options: QueryOptions; _executed: boolean = false; _finished: boolean = false; _chaining: boolean = false; _promise: Promise>; _stream: Serializable; totalSize = 0; totalFetched = 0; records: R[] = []; /** * */ constructor( conn: Connection, config: string | QueryConfig | { locator: string }, options?: Partial, ) { super(); this._conn = conn; this._logger = conn._logLevel ? Query._logger.createInstance(conn._logLevel) : Query._logger; if (typeof config === 'string') { this._soql = config; this._logger.debug(`config is soql: ${config}`); } else if (typeof (config as any).locator === 'string') { const locator: string = (config as any).locator; this._logger.debug(`config is locator: ${locator}`); this._locator = locator.includes('/') ? this.urlToLocator(locator) : locator; } else { this._logger.debug(`config is QueryConfig: ${JSON.stringify(config)}`); const { fields, includes, sort, ..._config } = config as QueryConfig< S, N >; this._config = _config; this.select(fields); if (includes) { this.includeChildren(includes); } if (sort) { this.sort(sort); } } this._options = { headers: {}, maxFetch: 10000, autoFetch: false, scanAll: false, responseTarget: 'QueryResult', ...(options || {}), } as QueryOptions; // promise instance this._promise = new Promise((resolve, reject) => { this.on('response', resolve); this.on('error', reject); }); this._stream = new Serializable(); this.on('record', (record) => this._stream.push(record)); this.on('end', () => this._stream.push(null)); this.on('error', (err) => { try { this._stream.emit('error', err); } catch (e) { // eslint-disable-line no-empty } }); } /** * Select fields to include in the returning result */ select< R extends Record = Record, FP extends FieldPathSpecifier = FieldPathSpecifier, FPC extends FieldProjectionConfig = FieldPathScopedProjection, R2 extends SObjectRecord = SObjectRecord >(fields: QueryField = '*'): Query { if (this._soql) { throw Error( 'Cannot set select fields for the query which has already built SOQL.', ); } function toFieldArray(fields: QueryField): string[] { return typeof fields === 'string' ? fields.split(/\s*,\s*/) : Array.isArray(fields) ? (fields as Array) .map(toFieldArray) .reduce((fs, f) => [...fs, ...f], []) : Object.entries(fields as { [name: string]: QueryField }) .map(([f, v]) => { if (typeof v === 'number' || typeof v === 'boolean') { return v ? [f] : []; } else { return toFieldArray(v).map((p) => `${f}.${p}`); } }) .reduce((fs, f) => [...fs, ...f], []); } if (fields) { this._config.fields = toFieldArray(fields); } // force convert query record type without changing instance; return (this as any) as Query; } /** * Set query conditions to filter the result records */ where(conditions: QueryCondition | string) { if (this._soql) { throw Error( 'Cannot set where conditions for the query which has already built SOQL.', ); } this._config.conditions = conditions; return this; } /** * Limit the returning result */ limit(limit: number) { if (this._soql) { throw Error( 'Cannot set limit for the query which has already built SOQL.', ); } this._config.limit = limit; return this; } /** * Skip records */ skip(offset: number) { if (this._soql) { throw Error( 'Cannot set skip/offset for the query which has already built SOQL.', ); } this._config.offset = offset; return this; } /** * Synonym of Query#skip() */ offset = this.skip; /** * Set query sort with direction */ sort(sort: QuerySort|string): this; sort(sort: SObjectFieldNames|string, dir: SortDir): this; sort( sort: QuerySort | SObjectFieldNames | string, dir?: SortDir, ) { if (this._soql) { throw Error( 'Cannot set sort for the query which has already built SOQL.', ); } if (typeof sort === 'string' && typeof dir !== 'undefined') { this._config.sort = [[sort, dir]]; } else { this._config.sort = sort as string | { [field: string]: SortDir }; } return this; } /** * Synonym of Query#sort() */ orderby: typeof Query.prototype.sort = this.sort; /** * Include child relationship query and move down to the child query context */ include< CRN extends ChildRelationshipNames, CN extends ChildRelationshipSObjectName, CFP extends FieldPathSpecifier = FieldPathSpecifier, CFPC extends FieldProjectionConfig = FieldPathScopedProjection, CR extends Record = SObjectRecord >( childRelName: CRN, conditions?: Optional>, fields?: Optional>, options?: { limit?: number; offset?: number; sort?: QuerySort }, ): SubQuery; include< CRN extends ChildRelationshipNames, CN extends SObjectNames, CR extends Record = SObjectRecord >( childRelName: string, conditions?: Optional>, fields?: Optional>, options?: { limit?: number; offset?: number; sort?: QuerySort }, ): SubQuery; include< CRN extends ChildRelationshipNames, CN extends ChildRelationshipSObjectName, CFP extends FieldPathSpecifier = FieldPathSpecifier, CFPC extends FieldProjectionConfig = FieldPathScopedProjection, CR extends Record = SObjectRecord >( childRelName: CRN | string, conditions?: Optional>, fields?: Optional>, options: { limit?: number; offset?: number; sort?: QuerySort } = {}, ): SubQuery { if (this._soql) { throw Error( 'Cannot include child relationship into the query which has already built SOQL.', ); } const childConfig: QueryConfig = { fields: fields === null ? undefined : fields, table: childRelName, conditions: conditions === null ? undefined : conditions, limit: options.limit, offset: options.offset, sort: options.sort, }; // eslint-disable-next-line no-use-before-define const childQuery = new SubQuery( this._conn, childRelName as CRN, childConfig, this, ); this._children.push(childQuery); return childQuery; } /** * Include child relationship queries, but not moving down to the children context */ includeChildren( includes: { [CRN in ChildRelationshipNames]?: QueryConfig< S, ChildRelationshipSObjectName >; }, ) { type CRN = ChildRelationshipNames; if (this._soql) { throw Error( 'Cannot include child relationship into the query which has already built SOQL.', ); } for (const crname of Object.keys(includes) as CRN[]) { const { conditions, fields, ...options } = includes[ crname ] as QueryConfig>; this.include(crname, conditions, fields, options); } return this; } /** * Setting maxFetch query option */ maxFetch(maxFetch: number) { this._options.maxFetch = maxFetch; return this; } /** * Switching auto fetch mode */ autoFetch(autoFetch: boolean) { this._options.autoFetch = autoFetch; return this; } /** * Set flag to scan all records including deleted and archived. */ scanAll(scanAll: boolean) { this._options.scanAll = scanAll; return this; } /** * */ setResponseTarget( responseTarget: QRT1, ): Query { if (responseTarget in ResponseTargets) { this._options.responseTarget = responseTarget; } // force change query response target without changing instance return (this as Query) as Query; } /** * Execute query and fetch records from server. */ execute( options_: Partial & { responseTarget?: QRT1 } = {}, ): Query { if (this._executed) { throw new Error('re-executing already executed query'); } if (this._finished) { throw new Error('executing already closed query'); } const options = { headers: options_.headers || this._options.headers, responseTarget: options_.responseTarget || this._options.responseTarget, autoFetch: options_.autoFetch || this._options.autoFetch, maxFetch: options_.maxFetch || this._options.maxFetch, scanAll: options_.scanAll || this._options.scanAll, }; // collect fetched records in array // only when response target is Records and // either callback or chaining promises are available to this query. this.once('fetch', () => { if ( options.responseTarget === ResponseTargets.Records && this._chaining ) { this._logger.debug('--- collecting all fetched records ---'); const records: Record[] = []; const onRecord = (record: Record) => records.push(record); this.on('record', onRecord); this.once('end', () => { this.removeListener('record', onRecord); this.emit('response', records, this); }); } }); // flag to prevent re-execution this._executed = true; (async () => { // start actual query this._logger.debug('>>> Query start >>>'); try { await this._execute(options); this._logger.debug('*** Query finished ***'); } catch (error) { this._logger.debug('--- Query error ---', error); this.emit('error', error); } })(); // return Query instance for chaining return (this as Query) as Query; } /** * Synonym of Query#execute() */ exec = this.execute; /** * Synonym of Query#execute() */ run = this.execute; private locatorToUrl() { return this._locator ? [this._conn._baseUrl(), '/query/', this._locator].join('') : ''; } private urlToLocator(url: string) { return url.split('/').pop(); } private constructResponse( rawDone: boolean, responseTarget: QueryResponseTarget[3], ): number; private constructResponse( rawDone: boolean, responseTarget: QueryResponseTarget[2], ): R; private constructResponse( rawDone: boolean, responseTarget: QueryResponseTarget[1], ): R[]; private constructResponse( rawDone: boolean, responseTarget: QueryResponseTarget[0], ): QueryResult; private constructResponse( rawDone: boolean, responseTarget: QueryResponseTarget, ): QueryResult | R[] | number | R { switch (responseTarget) { case 'Count': return this.totalSize; case 'SingleRecord': return this.records?.[0] ?? null; case 'Records': return this.records; // QueryResult is default response target default: return { ...{ records: this.records, totalSize: this.totalSize, done: rawDone ?? true, // when no records, done is omitted }, ...(this._locator ? { nextRecordsUrl: this.locatorToUrl() } : {}), }; } } /** * @private */ async _execute(options: QueryOptions): Promise> { const { headers, responseTarget, autoFetch, maxFetch, scanAll } = options; this._logger.debug('execute with options', options); let url; if (this._locator) { url = this.locatorToUrl(); } else { const soql = await this.toSOQL(); this._logger.debug(`SOQL = ${soql}`); url = [ this._conn._baseUrl(), '/', scanAll ? 'queryAll' : 'query', '?q=', encodeURIComponent(soql), ].join(''); } const data = await this._conn.request({ method: 'GET', url, headers }); this.emit('fetch'); this.totalSize = data.totalSize; // If autoFetch is true, fetch all records for any subqueries if (autoFetch && data.records.length > 0) { const recordKeys = Object.keys(data.records[0]); for (const record of data.records) { for (const key of recordKeys) { const field = record[key]; if (field && typeof field === 'object' && 'records' in field && 'nextRecordsUrl' in field) { record[key] = { ...field, records: await this._fetchAllSubqueryRecords(record, key, headers), done: true, nextRecordsUrl: undefined }; } } } } this.records = this.records?.concat( maxFetch - this.records.length > data.records.length ? data.records : data.records.slice(0, maxFetch - this.records.length), ); this._locator = data.nextRecordsUrl ? this.urlToLocator(data.nextRecordsUrl) : undefined; this._finished = this._finished || data.done || !autoFetch || this.records.length === maxFetch || // this is what the response looks like when there are no results (data.records.length === 0 && data.done === undefined); // streaming record instances const numRecords = data.records?.length ?? 0; let totalFetched = this.totalFetched; for (let i = 0; i < numRecords; i++) { if (totalFetched >= maxFetch) { this._finished = true; break; } const record = data.records[i]; this.emit('record', record, totalFetched, this); totalFetched += 1; } this.totalFetched = totalFetched; if (this._finished) { const response = this.constructResponse(data.done, responseTarget); // only fire response event when it should be notified per fetch if (responseTarget !== ResponseTargets.Records) { this.emit('response', response, this); } this.emit('end'); return response; } else { return this._execute(options); } } /** * Obtain readable stream instance */ stream(type: 'record'): Serializable; stream(type: 'csv'): Readable; stream(type: 'record' | 'csv' = 'csv') { if (!this._finished && !this._executed) { this.execute({ autoFetch: true }); } return type === 'record' ? this._stream : this._stream.stream(type); } /** * Pipe the queried records to another stream * This is for backward compatibility; Query is not a record stream instance anymore in 2.0. * If you want a record stream instance, use `Query#stream('record')`. */ pipe(stream: NodeJS.WritableStream) { return this.stream('record').pipe(stream); } /** * @protected */ async _expandFields(sobject_?: string): Promise { if (this._soql) { throw new Error( 'Cannot expand fields for the query which has already built SOQL.', ); } const { fields = [], table = '' } = this._config; const sobject = sobject_ || table; this._logger.debug( `_expandFields: sobject = ${sobject}, fields = ${fields.join(', ')}`, ); const [efields] = await Promise.all([ this._expandAsteriskFields(sobject, fields), ...this._children.map(async (childQuery) => { await childQuery._expandFields(); return [] as string[]; }), ]); this._config.fields = efields; this._config.includes = this._children .map((cquery) => { const cconfig = cquery._query._config; return [cconfig.table, cconfig] as [string, SOQLQueryConfig]; }) .reduce<{ [name: string]: SOQLQueryConfig }>( (includes, [ctable, cconfig]) => ({ ...includes, [ctable]: cconfig, }), {}, ); } /** * */ async _findRelationObject(relName: string): Promise { const table = this._config.table; if (!table) { throw new Error('No table information provided in the query'); } this._logger.debug( `finding table for relation "${relName}" in "${table}"...`, ); const sobject = await this._conn.describe$(table); const upperRname = relName.toUpperCase(); for (const cr of sobject.childRelationships) { if ( (cr.relationshipName || '').toUpperCase() === upperRname && cr.childSObject ) { return cr.childSObject; } } throw new Error(`No child relationship found: ${relName}`); } /** * */ async _expandAsteriskFields( sobject: string, fields: string[], ): Promise { const expandedFields = await Promise.all( fields.map(async (field) => this._expandAsteriskField(sobject, field)), ); return expandedFields.reduce( (eflds: string[], flds: string[]): string[] => [...eflds, ...flds], [], ); } /** * */ async _expandAsteriskField( sobject: string, field: string, ): Promise { this._logger.debug(`expanding field "${field}" in "${sobject}"...`); const fpath = field.split('.'); if (fpath[fpath.length - 1] === '*') { const so = await this._conn.describe$(sobject); this._logger.debug(`table ${sobject} has been described`); if (fpath.length > 1) { const rname = fpath.shift(); for (const f of so.fields) { if ( f.relationshipName && rname && f.relationshipName.toUpperCase() === rname.toUpperCase() ) { const rfield = f; const referenceTo = rfield.referenceTo || []; const rtable = referenceTo.length === 1 ? referenceTo[0] : 'Name'; const fpaths = await this._expandAsteriskField( rtable, fpath.join('.'), ); return fpaths.map((fp) => `${rname}.${fp}`); } } return []; } return so.fields.map((f) => f.name); } return [field]; } /** * Explain plan for executing query */ async explain() { const soql = await this.toSOQL(); this._logger.debug(`SOQL = ${soql}`); const url = `/query/?explain=${encodeURIComponent(soql)}`; return this._conn.request(url); } /** * Return SOQL expression for the query */ async toSOQL() { if (this._soql) { return this._soql; } await this._expandFields(); return createSOQL(this._config); } /** * Promise/A+ interface * http://promises-aplus.github.io/promises-spec/ * * Delegate to deferred promise, return promise instance for query result */ then( onResolve?: | ((qr: QueryResponse) => U | Promise) | null | undefined, onReject?: ((err: Error) => V | Promise) | null | undefined, ): Promise { this._chaining = true; if (!this._finished && !this._executed) { this.execute(); } if (!this._promise) { throw new Error( 'invalid state: promise is not set after query execution', ); } return this._promise.then(onResolve, onReject); } catch( onReject: ( err: Error, ) => QueryResponse | Promise>, ): Promise> { return this.then(null, onReject); } promise(): Promise> { // TODO(cristian): verify this is correct return Promise.resolve((this as unknown) as QueryResponse); } /** * Bulk delete queried records */ destroy(options?: QueryDestroyOptions): Promise; destroy(type: N, options?: QueryDestroyOptions): Promise; destroy(type?: N | QueryDestroyOptions, options?: QueryDestroyOptions) { if (typeof type === 'object' && type !== null) { options = type; type = undefined; } options = options || {}; const type_: Optional = type || (this._config.table as Optional); if (!type_) { throw new Error( 'SOQL based query needs SObject type information to bulk delete.', ); } // Set the threshold number to pass to bulk API const thresholdNum = options.allowBulk === false ? -1 : typeof options.bulkThreshold === 'number' ? options.bulkThreshold : // determine threshold if the connection version supports SObject collection API or not this._conn._ensureVersion(42) ? DEFAULT_BULK_THRESHOLD : this._conn._maxRequest / 2; const bulkApiVersion = options.bulkApiVersion ?? DEFAULT_BULK_API_VERSION; return new Promise((resolve, reject) => { const createBatch = () => this._conn .sobject(type_) .deleteBulk() .on('response', resolve) .on('error', reject); let records: Record[] = []; let batch: ReturnType | null = null; const handleRecord = (rec: Record) => { if (!rec.Id) { const err = new Error( 'Queried record does not include Salesforce record ID.', ); this.emit('error', err); return; } const record: Record = { Id: rec.Id }; if (batch) { batch.write(record); } else { records.push(record); if ( thresholdNum >= 0 && records.length > thresholdNum && bulkApiVersion === 1 ) { // Use bulk delete instead of SObject REST API batch = createBatch(); for (const record of records) { batch.write(record); } records = []; } } }; const handleEnd = () => { if (batch) { batch.end(); } else { const ids = records.map((record) => record.Id as string); if (records.length > thresholdNum && bulkApiVersion === 2) { this._conn.bulk2 .loadAndWaitForResults({ object: type_, operation: 'delete', input: records, }) .then( (allResults) => resolve(this.mapBulkV2ResultsToSaveResults(allResults)), reject, ); } else { this._conn .sobject(type_) .destroy(ids, { allowRecursive: true }) .then(resolve, reject); } } }; this.stream('record') .on('data', handleRecord) .on('end', handleEnd) .on('error', reject); }); } /** * Synonym of Query#destroy() */ delete = this.destroy; /** * Synonym of Query#destroy() */ del = this.destroy; /** * Bulk update queried records, using given mapping function/object */ update>( mapping: ((rec: R) => UR) | UR, type: N, options?: QueryUpdateOptions, ): Promise; update>( mapping: ((rec: R) => UR) | UR, options?: QueryUpdateOptions, ): Promise; update>( mapping: ((rec: R) => UR) | UR, type?: N | QueryUpdateOptions, options?: QueryUpdateOptions, ) { if (typeof type === 'object' && type !== null) { options = type; type = undefined; } options = options || {}; const type_: Optional = type || (this._config && (this._config.table as Optional)); if (!type_) { throw new Error( 'SOQL based query needs SObject type information to bulk update.', ); } const updateStream = typeof mapping === 'function' ? RecordStream.map(mapping) : RecordStream.recordMapStream(mapping, options.skipRecordTemplateEval); // Set the threshold number to pass to bulk API const thresholdNum = options.allowBulk === false ? -1 : typeof options.bulkThreshold === 'number' ? options.bulkThreshold : // determine threshold if the connection version supports SObject collection API or not this._conn._ensureVersion(42) ? DEFAULT_BULK_THRESHOLD : this._conn._maxRequest / 2; const bulkApiVersion = options.bulkApiVersion ?? DEFAULT_BULK_API_VERSION; return new Promise((resolve, reject) => { const createBatch = () => this._conn .sobject(type_) .updateBulk() .on('response', resolve) .on('error', reject); let records: Array> = []; let batch: ReturnType | null = null; const handleRecord = (record: Record) => { if (batch) { batch.write(record); } else { records.push(record as SObjectUpdateRecord); } if ( thresholdNum >= 0 && records.length > thresholdNum && bulkApiVersion === 1 ) { // Use bulk update instead of SObject REST API batch = createBatch(); for (const record of records) { batch.write(record); } records = []; } }; const handleEnd = () => { if (batch) { batch.end(); } else { if (records.length > thresholdNum && bulkApiVersion === 2) { this._conn.bulk2 .loadAndWaitForResults({ object: type_, operation: 'update', input: records, }) .then( (allResults) => resolve(this.mapBulkV2ResultsToSaveResults(allResults)), reject, ); } else { this._conn .sobject(type_) .update(records, { allowRecursive: true }) .then(resolve, reject); } } }; this.stream('record') .on('error', reject) .pipe(updateStream) .on('data', handleRecord) .on('end', handleEnd) .on('error', reject); }); } private mapBulkV2ResultsToSaveResults( bulkJobAllResults: IngestJobV2Results, ): SaveResult[] { const successSaveResults: SaveResult[] = bulkJobAllResults.successfulResults.map( (r) => { const saveResult: SaveResult = { id: r.sf__Id, success: true, errors: [], }; return saveResult; }, ); const failedSaveResults = bulkJobAllResults.failedResults.map((r) => { const saveResult: SaveResult = { success: false, errors: [ { errorCode: r.sf__Error, message: r.sf__Error, }, ], }; return saveResult; }); return [...successSaveResults, ...failedSaveResults]; } /** * Fetches all records for a subquery field by following nextRecordsUrl * @private */ private async _fetchAllSubqueryRecords( record: Record, fieldName: string, headers: { [name: string]: string }, ): Promise { const subqueryField = record[fieldName] as QueryResult; if (!subqueryField || !subqueryField.records) { return []; } let allRecords = [...subqueryField.records]; let nextRecordsUrl = subqueryField.nextRecordsUrl; while (nextRecordsUrl) { // When following nextRecordsUrl for a subquery, we need to preserve the relationship context // by using the full URL as-is rather than trying to reconstruct it const data = await this._conn.request>({ method: 'GET', url: nextRecordsUrl, // Use the full URL as provided by Salesforce headers }); allRecords = allRecords.concat(data.records); nextRecordsUrl = data.nextRecordsUrl; } return allRecords; } } /*--------------------------------------------*/ /** * SubQuery object for representing child relationship query */ export class SubQuery< S extends Schema, PN extends SObjectNames, PR extends Record, PQRT extends QueryResponseTarget, CRN extends ChildRelationshipNames = ChildRelationshipNames, CN extends SObjectNames = ChildRelationshipSObjectName, CR extends Record = Record > { _relName: CRN; _query: Query; _parent: Query; /** * */ constructor( conn: Connection, relName: CRN, config: QueryConfig, parent: Query, ) { this._relName = relName; this._query = new Query(conn, config); this._parent = parent; } /** * */ select< R extends Record = Record, FP extends FieldPathSpecifier = FieldPathSpecifier, FPC extends FieldProjectionConfig = FieldPathScopedProjection >( fields: QueryField, ): SubQuery> { // force convert query record type without changing instance this._query = this._query.select(fields) as any; return (this as any) as SubQuery< S, PN, PR, PQRT, CRN, CN, SObjectRecord >; } /** * */ where(conditions: QueryCondition | string): this { this._query = this._query.where(conditions); return this; } /** * Limit the returning result */ limit(limit: number) { this._query = this._query.limit(limit); return this; } /** * Skip records */ skip(offset: number) { this._query = this._query.skip(offset); return this; } /** * Synonym of SubQuery#skip() */ offset = this.skip; /** * Set query sort with direction */ sort(sort: QuerySort): this; sort(sort: string| SObjectFieldNames, dir: SortDir): this; sort( sort: QuerySort | SObjectFieldNames | string, dir?: SortDir, ) { this._query = this._query.sort(sort as any, dir as SortDir); return this; } /** * Synonym of SubQuery#sort() */ orderby: typeof SubQuery.prototype.sort = this.sort; /** * */ async _expandFields() { const sobject = await this._parent._findRelationObject(this._relName); return this._query._expandFields(sobject); } /** * Back the context to parent query object */ end< CRP extends SObjectChildRelationshipProp< CRN, CR > = SObjectChildRelationshipProp, PR1 extends Record = PR & CRP >(): Query { return (this._parent as any) as Query; } } export default Query;