/* eslint-disable no-div-regex */ /* eslint-disable line-comment-position */ /* eslint-disable no-inline-comments */ /* eslint-disable require-unicode-regexp */ /* eslint-disable @typescript-eslint/naming-convention */ import type { OpenApi, OpenApiClientConfiguration, OperationWithPathInfo } from '@comake/openapi-operation-executor'; import { OpenApiOperationExecutor } from '@comake/openapi-operation-executor'; import { getIdFromNodeObjectIfDefined, XSD, type ReferenceNodeObject } from '@comake/rmlmapper-js'; import type { AxiosError, AxiosRequestConfig, AxiosResponse } from 'axios'; import axios from 'axios'; import type { ContextDefinition, GraphObject, NodeObject } from 'jsonld'; import type { Frame } from 'jsonld/jsonld-spec'; import { JSONPath } from 'jsonpath-plus'; import SHACLValidator from 'rdf-validate-shacl'; import type ValidationReport from 'rdf-validate-shacl/src/validation-report'; import { EngineConstants, OPEN_API_RUNTIME_AUTHORIZATION, PROP_ENTITY_ID, PROP_ENTITY_TYPE, PROP_ENTITY_VALUE, RDF, RDFS, RML_LIST, SHACL } from './constants'; import { globalCustomCapabilities } from './customCapabilities'; import { globalHooks, HookStages, HookTypes } from './hooks/globalHooks'; import type { ExecutionOptions } from './JsExecutor'; import type { ICodeExecutor } from './JsExecutor/types'; import { Logger } from './logger'; import { Mapper } from './mapping/Mapper'; import type { ReadCacheOperation, ReadCachePolicy, ReadCacheStore, SklEngineOptions } from './SklEngineOptions'; import type { FindOperator } from './storage/FindOperator'; import type { FindAllOptions, FindOneOptions, FindOptionsWhere } from './storage/FindOptionsTypes'; import type { GroupByOptions, GroupByResponse } from './storage/GroupOptionTypes'; import { Exists } from './storage/operator/Exists'; import { In } from './storage/operator/In'; import { InversePath } from './storage/operator/InversePath'; import { Not } from './storage/operator/Not'; import { OneOrMorePath } from './storage/operator/OneOrMorePath'; import { SequencePath } from './storage/operator/SequencePath'; import { ZeroOrMorePath } from './storage/operator/ZeroOrMorePath'; import type { QueryAdapter, RawQueryResult } from './storage/query-adapter/QueryAdapter'; import { SparqlQueryAdapter } from './storage/query-adapter/sparql/SparqlQueryAdapter'; import { PerformanceLogger } from './util/PerformanceLogger'; import { buildReadCacheKey, ReadCacheSingleflight } from './util/ReadCacheHelper'; // Import './util/safeJsonStringify'; import type { Callbacks, Capability, CapabilityConfig, CapabilityMapping, Entity, JSONObject, JSONValue, Mapping, MappingWithInputs, MappingWithInputsReference, MappingWithOutputsMapping, MappingWithParallel, MappingWithSeries, OperationResponse, OrArray, RdfList, SeriesCapabilityArgs, SKLEngineInterface, TriggerMapping } from './util/Types'; import { convertJsonLdToQuads, ensureArray, getValueIfDefined, toJSON } from './util/Util'; import { SKL_DATA_NAMESPACE, SKL_NAMESPACE } from './util/Vocabularies'; export type CapabilityHandler = = OrArray>( params: JSONObject, capabilityConfig?: CapabilityConfig ) => Promise; export type CapabilityInterface = Record; export type MappingResponseOption = T extends true ? JSONObject : NodeObject; export type WriteOptions = { bypassHooks?: boolean }; const DEFAULT_READ_CACHE_TTL_MS = 60_000; const READ_CACHE_ENVELOPE_MARKER = '__sklReadCacheEnvelopeV1'; type ReadCacheEnvelope = { [READ_CACHE_ENVELOPE_MARKER]: true; value: T; }; export class SKLEngine implements SKLEngineInterface { private readonly queryAdapter: QueryAdapter; private readonly functions?: Record any>; private readonly inputFiles?: Record; private readonly globalCallbacks?: Callbacks; private readonly disableValidation?: boolean; public readonly capability: CapabilityInterface; private codeExecutor: ICodeExecutor | undefined; private readonly skdsEndpointUrl: string; private readonly scriptPath: string; private readonly logger: Logger; private readonly readCache?: ReadCacheStore; private readonly readCachePolicy?: ReadCachePolicy; private readonly readCacheNamespace?: string; private readonly readCacheSingleflight = new ReadCacheSingleflight(); public constructor(options: SklEngineOptions) { this.queryAdapter = new SparqlQueryAdapter(options); this.disableValidation = options.disableValidation; this.globalCallbacks = options.callbacks; this.inputFiles = options.inputFiles; this.skdsEndpointUrl = (options as any).endpointUrl; this.scriptPath = (options as any).scriptPath; this.readCache = options.readCache; this.readCachePolicy = options.readCachePolicy; this.readCacheNamespace = options.readCacheNamespace; if (options.functions) { this.functions = Object.fromEntries( Object.entries(options.functions).map(([key, func]) => [ key, (data: Record = {}) => { // Add the SKL instance to the data object // eslint-disable-next-line @typescript-eslint/no-this-alias data.skl = this; // Call the original function return func(data); } ]) ); } this.logger = Logger.getInstance(options.debugMode); // eslint-disable-next-line func-style const getCapabilityHandler = (getTarget: CapabilityInterface, property: string): CapabilityHandler => async = OrArray>( capabilityArgs: JSONObject, capabilityConfig?: CapabilityConfig ): Promise => this.executeCapabilityByName(property, capabilityArgs, capabilityConfig) as Promise; this.capability = new Proxy({} as CapabilityInterface, { get: getCapabilityHandler }); } public setCodeExecutor(codeExecutor: ICodeExecutor): void { this.codeExecutor = codeExecutor; } public async executeRawQuery(query: string): Promise { return await this.queryAdapter.executeRawQuery(query); } public async executeRawUpdate(query: string): Promise { return await this.queryAdapter.executeRawUpdate(query); } public async executeRawConstructQuery(query: string, frame?: Frame): Promise { return await this.queryAdapter.executeRawConstructQuery(query, frame); } private cloneForReadCache(value: T): T { try { return structuredClone(value); } catch { return value; } } private wrapReadCacheValue(value: T): ReadCacheEnvelope { return { [READ_CACHE_ENVELOPE_MARKER]: true, value }; } private isReadCacheEnvelope(value: unknown): value is ReadCacheEnvelope { if (!value || typeof value !== 'object') { return false; } return (value as Record)[READ_CACHE_ENVELOPE_MARKER] === true; } private stripBypassCacheFromFindOneOptions(options?: FindOneOptions): FindOneOptions | undefined { if (!options) { return options; } const { bypassCache, ...rest } = options; return rest; } private stripBypassCacheFromFindAllOptions(options?: FindAllOptions): FindAllOptions | undefined { if (!options) { return options; } const { bypassCache, ...rest } = options; return rest; } private stripBypassCacheFromWhere(where?: FindOptionsWhere): FindOptionsWhere | undefined { if (!where) { return where; } const { bypassCache, ...rest } = where; return rest; } private async runReadWithCache(params: { operation: ReadCacheOperation; args: readonly unknown[]; bypassCache?: boolean; execute: () => Promise; }): Promise { if (params.bypassCache) { return await params.execute(); } if (!this.readCache || !this.readCachePolicy) { return await params.execute(); } const policyDecision = this.readCachePolicy({ operation: params.operation, args: params.args, endpointUrl: this.skdsEndpointUrl, namespace: this.readCacheNamespace }); if (!policyDecision.cache) { return await params.execute(); } const ttlMs = policyDecision.ttlMs ?? DEFAULT_READ_CACHE_TTL_MS; if (ttlMs <= 0) { return await params.execute(); } const cacheKey = buildReadCacheKey({ operation: params.operation, args: params.args, endpointUrl: this.skdsEndpointUrl, namespace: this.readCacheNamespace, keyHint: policyDecision.keyHint }); return await this.readCacheSingleflight.do(cacheKey, async() => { try { const cached = await this.readCache?.get(cacheKey); if (cached !== undefined) { if (this.isReadCacheEnvelope(cached)) { return this.cloneForReadCache(cached.value); } return this.cloneForReadCache(cached as T); } } catch (error) { this.logger.debug('Read cache lookup failed; continuing without cache', error as any); } const result = await params.execute(); try { await this.readCache?.set(cacheKey, this.wrapReadCacheValue(this.cloneForReadCache(result)), ttlMs); } catch (error) { this.logger.debug('Read cache write failed; continuing without cache', error as any); } return result; }); } public async find(options?: FindOneOptions): Promise { const bypassCache = options?.bypassCache; const optionsWithoutBypass = this.stripBypassCacheFromFindOneOptions(options); return PerformanceLogger.withSpanRoot('SklEngine.find', async() => { const context = { entities: [], operation: 'find', operationParameters: { options: optionsWithoutBypass }, sklEngine: this }; await globalHooks.execute(HookTypes.READ, HookStages.BEFORE, context); try { const entity = await this.runReadWithCache({ operation: 'find', args: [optionsWithoutBypass], bypassCache, execute: async() => await this.queryAdapter.find(optionsWithoutBypass) }); if (!entity) { throw new Error(`No schema found with fields matching ${JSON.stringify(optionsWithoutBypass)}`); } const updatedContext = { ...context, entities: [entity]}; const afterHookResult = await globalHooks.execute(HookTypes.READ, HookStages.AFTER, updatedContext, entity); return afterHookResult || entity; } catch (error: unknown) { await globalHooks.execute(HookTypes.READ, HookStages.ERROR, context, error); throw error; } }, { options: optionsWithoutBypass }); } public async findBy(where: FindOptionsWhere, notFoundErrorMessage?: string): Promise { const bypassCache = where?.bypassCache; const whereWithoutBypass = this.stripBypassCacheFromWhere(where)!; return PerformanceLogger.withSpanRoot('SklEngine.findBy', async() => { const context = { entities: [], operation: 'findBy', operationParameters: { where: whereWithoutBypass }, sklEngine: this }; await globalHooks.execute(HookTypes.READ, HookStages.BEFORE, context); try { const entity = await this.runReadWithCache({ operation: 'findBy', args: [whereWithoutBypass, notFoundErrorMessage], bypassCache, execute: async() => await this.queryAdapter.findBy(whereWithoutBypass) }); if (entity) { const updatedContext = { ...context, entities: [entity]}; await globalHooks.execute(HookTypes.READ, HookStages.AFTER, updatedContext, entity); return entity; } throw new Error(notFoundErrorMessage ?? `No schema found with fields matching ${JSON.stringify(whereWithoutBypass)}`); } catch (error: unknown) { await globalHooks.execute(HookTypes.READ, HookStages.ERROR, context, error); throw error; } }, { where: whereWithoutBypass }); } public async findByIfExists(options: FindOptionsWhere): Promise { try { const entity = await this.findBy(options); return entity; } catch { return undefined; } } public async findAll(options?: FindAllOptions): Promise { const bypassCache = options?.bypassCache; const optionsWithoutBypass = this.stripBypassCacheFromFindAllOptions(options); return PerformanceLogger.withSpanRoot('SklEngine.findAll', async() => { const context = { entities: [], operation: 'findAll', operationParameters: { options: optionsWithoutBypass }, sklEngine: this }; await globalHooks.execute(HookTypes.READ, HookStages.BEFORE, context); try { const entities = await this.runReadWithCache({ operation: 'findAll', args: [optionsWithoutBypass], bypassCache, execute: async() => await this.queryAdapter.findAll(optionsWithoutBypass) }); const updatedContext = { ...context, entities }; await globalHooks.execute(HookTypes.READ, HookStages.AFTER, updatedContext, entities); return entities; } catch (error: unknown) { await globalHooks.execute(HookTypes.READ, HookStages.ERROR, context, error); throw error; } }, { options: optionsWithoutBypass }); } public async groupBy(options: GroupByOptions): Promise { return PerformanceLogger.withSpanRoot('SklEngine.groupBy', async() => { const context = { entities: [], operation: 'groupBy', operationParameters: { options }, sklEngine: this }; await globalHooks.execute(HookTypes.READ, HookStages.BEFORE, context); try { const result = await this.queryAdapter.groupBy(options); const updatedContext = { ...context, result }; await globalHooks.execute(HookTypes.READ, HookStages.AFTER, updatedContext, result); return result; } catch (error: unknown) { await globalHooks.execute(HookTypes.READ, HookStages.ERROR, context, error); throw error; } }, { options }); } public async findAllBy(where: FindOptionsWhere): Promise { return PerformanceLogger.withSpanRoot('SklEngine.findAllBy', async() => { const context = { entities: [], operation: 'findAllBy', operationParameters: { where }, sklEngine: this }; await globalHooks.execute(HookTypes.READ, HookStages.BEFORE, context); try { const entities = await this.queryAdapter.findAllBy(where); const updatedContext = { ...context, entities }; await globalHooks.execute(HookTypes.READ, HookStages.AFTER, updatedContext, entities); return entities; } catch (error: unknown) { await globalHooks.execute(HookTypes.READ, HookStages.ERROR, context, error); throw error; } }, { where }); } public async exists(options?: FindAllOptions): Promise { return PerformanceLogger.withSpanRoot('SklEngine.exists', async() => this.queryAdapter.exists(options), { options }); } public async count(options?: FindAllOptions): Promise { const bypassCache = options?.bypassCache; const optionsWithoutBypass = this.stripBypassCacheFromFindAllOptions(options); return PerformanceLogger.withSpanRoot( 'SklEngine.count', async() => await this.runReadWithCache({ operation: 'count', args: [optionsWithoutBypass], bypassCache, execute: async() => await this.queryAdapter.count(optionsWithoutBypass) }), { options: optionsWithoutBypass } ); } public async save(entity: Entity, options?: WriteOptions): Promise; public async save(entities: Entity[], options?: WriteOptions): Promise; public async save(entityOrEntities: Entity | Entity[], options?: WriteOptions): Promise { return PerformanceLogger.withSpanRoot('SklEngine.save', async() => { const entityArray = Array.isArray(entityOrEntities) ? entityOrEntities : [entityOrEntities]; const isSingleEntity = !Array.isArray(entityOrEntities); await globalHooks.executeBeforeCreate(entityArray, { sklEngine: this, bypassHooks: options?.bypassHooks }); try { await this.validateEntitiesConformToObjectSchema(entityArray); const savedEntities = await this.queryAdapter.save(entityArray); await globalHooks.executeAfterCreate(savedEntities, { sklEngine: this, bypassHooks: options?.bypassHooks }); return isSingleEntity ? savedEntities[0] : savedEntities; } catch (error) { await globalHooks.executeErrorCreate(entityArray, error as Error, { sklEngine: this, bypassHooks: options?.bypassHooks }); throw error; } }, { entityCount: Array.isArray(entityOrEntities) ? entityOrEntities.length : 1 }); } public async update(id: string, attributes: Partial, options?: WriteOptions): Promise; public async update(ids: string[], attributes: Partial, options?: WriteOptions): Promise; public async update(idOrIds: string | string[], attributes: Partial, options?: WriteOptions): Promise { return PerformanceLogger.withSpanRoot('SklEngine.update', async() => { const idArray = Array.isArray(idOrIds) ? idOrIds : [idOrIds]; const isSingleEntity = !Array.isArray(idOrIds); await globalHooks.execute(HookTypes.UPDATE, HookStages.BEFORE, { entities: [], operation: 'update', operationParameters: { idArray, attributes }, sklEngine: this, bypassHooks: options?.bypassHooks }); try { if (idArray.length > 1) { await this.validateEntitiesWithIdsConformsToObjectSchemaForAttributes(idArray, attributes); } else { await this.validateEntityWithIdConformsToObjectSchemaForAttributes(idArray[0], attributes); } await this.queryAdapter.update(isSingleEntity ? idArray[0] : (idArray as any), attributes); await globalHooks.execute(HookTypes.UPDATE, HookStages.AFTER, { entities: [], operation: 'update', operationParameters: { idArray, attributes }, sklEngine: this, bypassHooks: options?.bypassHooks }); } catch (error: unknown) { await globalHooks.execute(HookTypes.UPDATE, HookStages.ERROR, { entities: [], operation: 'update', operationParameters: { idArray, attributes }, sklEngine: this, bypassHooks: options?.bypassHooks }, error); throw error; } }, { idCount: Array.isArray(idOrIds) ? idOrIds.length : 1 }); } public async validateEntitiesConformToObjectSchema(entities: Entity[]): Promise { const entitiesByType = this.groupEntitiesByType(entities); for (const type of Object.keys(entitiesByType)) { const object = await this.findByIfExists({ id: type }); if (object) { const parentObjects = await this.getSuperClassesOfObject(type); for (const currentObject of [object, ...parentObjects]) { const entitiesOfType = entitiesByType[type]; const nounSchemaWithTarget = { ...currentObject, [SHACL.targetNode]: entitiesOfType.map( (entity): ReferenceNodeObject => ({ [PROP_ENTITY_ID]: entity[PROP_ENTITY_ID] }) ) }; const report = await this.convertToQuadsAndValidateAgainstShape(entitiesOfType, nounSchemaWithTarget); if (!report.conforms) { const entityIds = entitiesOfType.map((entity): string => entity[PROP_ENTITY_ID]); this.throwValidationReportError( report, `Entity ${entityIds.join(', ')} does not conform to the ${currentObject[PROP_ENTITY_ID]} schema.` ); } } } } } private groupEntitiesByType(entities: Entity[]): Record { return entities.reduce((groupedEntities: Record, entity): Record => { const entityTypes = Array.isArray(entity[PROP_ENTITY_TYPE]) ? entity[PROP_ENTITY_TYPE] : [entity[PROP_ENTITY_TYPE]]; for (const type of entityTypes) { if (!groupedEntities[type]) { groupedEntities[type] = []; } groupedEntities[type].push(entity); } return groupedEntities; }, {}); } private async getSuperClassesOfObject(object: string): Promise { return await this.getParentsOfSelector(object); } private async getSuperClassesOfObjects(nouns: string[]): Promise { return await this.getParentsOfSelector(In(nouns)); } private async getParentsOfSelector(selector: string | FindOperator): Promise { return await this.findAll({ where: { id: InversePath({ subPath: OneOrMorePath({ subPath: RDFS.subClassOf as string }), value: selector }) } }); } private async validateEntityConformsToObjectSchema(entity: Entity): Promise { const nounIds = Array.isArray(entity[PROP_ENTITY_TYPE]) ? entity[PROP_ENTITY_TYPE] : [entity[PROP_ENTITY_TYPE]]; const directObjects = await this.findAllBy({ id: In(nounIds) }); if (directObjects.length > 0) { const existingObjectIds = directObjects.map((object): string => object[PROP_ENTITY_ID]); const parentObjects = await this.getSuperClassesOfObjects(existingObjectIds); for (const currentObject of [...directObjects, ...parentObjects]) { const nounSchemaWithTarget = { ...currentObject, [SHACL.targetNode]: { [PROP_ENTITY_ID]: entity[PROP_ENTITY_ID] } }; const report = await this.convertToQuadsAndValidateAgainstShape(entity, nounSchemaWithTarget); if (!report.conforms) { this.throwValidationReportError( report, `Entity ${entity[PROP_ENTITY_ID]} does not conform to the ${currentObject[PROP_ENTITY_ID]} schema.` ); } } } } private async validateEntitiesWithIdsConformsToObjectSchemaForAttributes( ids: string[], attributes: Partial ): Promise { for (const id of ids) { await this.validateEntityWithIdConformsToObjectSchemaForAttributes(id, attributes); } } private async getObjectsAndParentObjectsOfEntity(id: string): Promise { return await this.findAllBy({ id: InversePath({ subPath: SequencePath({ subPath: [RDF.type, ZeroOrMorePath({ subPath: RDFS.subClassOf as string })] }), value: id }) }); } private async validateEntityWithIdConformsToObjectSchemaForAttributes( id: string, attributes: Partial ): Promise { const nouns = await this.getObjectsAndParentObjectsOfEntity(id); for (const currentObject of nouns) { if (SHACL.property in currentObject) { const nounProperties = ensureArray(currentObject[SHACL.property] as OrArray).filter( (property): boolean => { const path = property[SHACL.path]; if (typeof path === 'string' && path in attributes) { return true; } if (typeof path === 'object' && PROP_ENTITY_ID in path! && (path[PROP_ENTITY_ID] as string) in attributes) { return true; } return false; } ); if (nounProperties.length > 0) { const nounSchemaWithTarget = { [PROP_ENTITY_TYPE]: SHACL.NodeShape, [SHACL.targetNode]: { [PROP_ENTITY_ID]: id }, [SHACL.property]: nounProperties }; const attributesWithId = { ...attributes, [PROP_ENTITY_ID]: id }; const report = await this.convertToQuadsAndValidateAgainstShape(attributesWithId, nounSchemaWithTarget); if (!report.conforms) { this.throwValidationReportError( report, `Entity ${id} does not conform to the ${currentObject[PROP_ENTITY_ID]} schema.` ); } } } } } public async delete(id: string, options?: WriteOptions): Promise; public async delete(ids: string[], options?: WriteOptions): Promise; public async delete(idOrIds: string | string[], options?: WriteOptions): Promise { return PerformanceLogger.withSpanRoot('SklEngine.delete', async() => { const idArray = Array.isArray(idOrIds) ? idOrIds : [idOrIds]; await globalHooks.execute(HookTypes.DELETE, HookStages.BEFORE, { entities: [], operation: 'delete', operationParameters: { idArray }, sklEngine: this, bypassHooks: options?.bypassHooks }); try { await this.queryAdapter.delete(idArray); await globalHooks.execute(HookTypes.DELETE, HookStages.AFTER, { entities: [], operation: 'delete', operationParameters: { idArray }, sklEngine: this, bypassHooks: options?.bypassHooks }); } catch (error) { await globalHooks.execute(HookTypes.DELETE, HookStages.ERROR, { entities: [], operation: 'delete', operationParameters: { idArray }, sklEngine: this, bypassHooks: options?.bypassHooks }, error); throw error; } }, { idCount: Array.isArray(idOrIds) ? idOrIds.length : 1 }); } public async destroy(entity: Entity): Promise; public async destroy(entities: Entity[]): Promise; public async destroy(entityOrEntities: Entity | Entity[]): Promise { if (Array.isArray(entityOrEntities)) { return await this.queryAdapter.destroy(entityOrEntities); } return await this.queryAdapter.destroy(entityOrEntities); } public async destroyAll(): Promise { return await this.queryAdapter.destroyAll(); } public async performMapping( args: JSONValue, mapping: OrArray, frame?: Record, capabilityConfig?: CapabilityConfig, jsExecutionOptions?: ExecutionOptions ): Promise { const mappingArray = ensureArray(mapping); const codeBlocks = mappingArray.filter( (mappingItem: NodeObject): boolean => mappingItem[PROP_ENTITY_TYPE] === EngineConstants.spec.codeBlock && this.isJavaScriptCode(getValueIfDefined(mappingItem[EngineConstants.prop.codeBody])!) ); // FIXME: Handle if we can combine the codeb blocks with triples map. // As of now if there is any code block, triples map does not get executed. if (codeBlocks.length > 0) { return await this.executeCodeBlocks(codeBlocks, args, jsExecutionOptions ?? {}); } const functions = { ...this.functions, ...capabilityConfig?.functions }; const mapper = new Mapper({ functions }); return await mapper.apply(args, mapping, frame ?? {}); } public async executeTrigger(integration: string, payload: any): Promise { const triggerToCapabilityMapping = await this.findTriggerCapabilityMapping(integration); const capabilityArgs = await this.performParameterMappingOnArgsIfDefined( payload, triggerToCapabilityMapping as Partial | Partial ); const capabilityId = await this.performCapabilityMappingWithArgs(payload, triggerToCapabilityMapping); if (capabilityId) { const mappedCapability = (await this.findBy({ id: capabilityId })) as Capability; await this.executeCapability(mappedCapability, capabilityArgs); } } private async findTriggerCapabilityMapping(integration: string): Promise { const triggerCapabilityMappingNew = (await this.findBy( { type: EngineConstants.spec.capabilityMapping, [EngineConstants.prop.capability]: integration, [EngineConstants.prop.capabilityType]: EngineConstants.spec.triggerCapabilityMapping }, `Failed to find a Trigger Capability mapping for integration ${integration}` )) as TriggerMapping; if (triggerCapabilityMappingNew) { return triggerCapabilityMappingNew; } throw new Error(`Failed to find a Trigger Capability mapping for integration ${integration}`); } private async executeCapabilityByName( capabilityName: string, capabilityArgs: JSONObject, capabilityConfig?: CapabilityConfig ): Promise> { const capability = await this.findCapabilityWithName(capabilityName); return await this.executeCapability(capability, capabilityArgs, capabilityConfig); } private async findCapabilityWithName(capabilityName: string): Promise { return (await this.findBy( { type: EngineConstants.spec.capability, [EngineConstants.prop.label]: capabilityName }, `Failed to find the capability ${capabilityName} in the schema.` )) as Capability; } public async executeCapability( capability: Capability, capabilityArgs: JSONObject, capabilityConfig?: CapabilityConfig ): Promise> { this.globalCallbacks?.onCapabilityStart?.(capability[PROP_ENTITY_ID], capabilityArgs); if (capabilityConfig?.callbacks?.onCapabilityStart) { this.logger.debug('Capability arguments', capabilityArgs); capabilityConfig.callbacks.onCapabilityStart(capability[PROP_ENTITY_ID], capabilityArgs); } const { mapping, account } = await this.findMappingForCapabilityContextually( capability[PROP_ENTITY_ID], capabilityArgs ); this.logger.debug('Mapping', JSON.stringify(mapping)); const shouldValidate = this.shouldValidate(capabilityConfig); if (shouldValidate) { await this.assertCapabilityParamsMatchParameterSchemas(capabilityArgs, capability); } try { // Execute capability mapping before hook if appropriate - // works for any mapping that can be used as a verb mapping if (mapping) { await globalHooks.executeBeforeExecuteCapabilityMapping([capabilityArgs] as Entity[], mapping, { sklEngine: this }); } const verbReturnValue = await this.executeMapping(mapping, capabilityArgs, capabilityConfig, account); if (shouldValidate) { await this.assertCapabilityReturnValueMatchesReturnTypeSchema(verbReturnValue, capability); } // Execute capability mapping after hook if appropriate if (mapping) { await globalHooks.executeAfterExecuteCapabilityMapping([capabilityArgs] as Entity[], mapping, verbReturnValue, { sklEngine: this }); } this.globalCallbacks?.onCapabilityEnd?.(capability[PROP_ENTITY_ID], verbReturnValue); if (capabilityConfig?.callbacks?.onCapabilityEnd) { capabilityConfig.callbacks.onCapabilityEnd(capability[PROP_ENTITY_ID], verbReturnValue); } return verbReturnValue; } catch (error) { // Execute capability mapping error hook if appropriate if (mapping) { await globalHooks.executeErrorExecuteCapabilityMapping([capabilityArgs] as Entity[], mapping, error as Error, { sklEngine: this }); } throw error; } } private async findMappingForCapabilityContextually( capabilityId: string, args: JSONObject ): Promise<{ mapping: CapabilityMapping; account?: Entity }> { if (args.mapping) { const mapping = await this.findByIfExists({ id: args.mapping as string }); if (!mapping) { throw new Error(`Mapping ${args.mapping as string} not found.`); } return { mapping: mapping as CapabilityMapping }; } if (args.object) { const mapping = await this.findCapabilityObjectMapping(capabilityId, args.object as string); if (mapping) { return { mapping }; } } if (args.account) { const account = await this.findBy({ id: args.account as string }); const integratedProductId = (account[EngineConstants.prop.integration] as ReferenceNodeObject)[PROP_ENTITY_ID]; const mapping = await this.findCapabilityIntegrationMapping(capabilityId, integratedProductId); if (mapping) { return { mapping, account }; } } const mappings = await this.findAllBy({ type: EngineConstants.spec.capabilityMapping, [EngineConstants.prop.capability]: capabilityId, [EngineConstants.prop.integration]: Not(Exists()), [EngineConstants.prop.object]: Not(Exists()) }); if (mappings.length === 1) { return { mapping: mappings[0] as CapabilityMapping }; } if (mappings.length > 1) { throw new Error('Multiple mappings found for capability, please specify one.'); } if (args.object) { throw new Error(`Mapping between object ${args.object as string} and capability ${capabilityId} not found.`); } if (args.account) { throw new Error(`Mapping between account ${args.account as string} and capability ${capabilityId} not found.`); } throw new Error(`No mapping found.`); } public async executeMapping( mapping: Mapping, args: JSONObject, capabilityConfig?: CapabilityConfig, account?: Entity ): Promise> { args = await this.addPreProcessingMappingToArgs(mapping, args, capabilityConfig); let returnValue: OrArray; // If (EngineConstants.prop.capability in mapping || EngineConstants.prop.capabilityMapping in mapping) { // const capabilityId = await this.performCapabilityMappingWithArgs(args, mapping, capabilityConfig); // const mappedArgs = await this.performParameterMappingOnArgsIfDefined( // { ...args, capabilityId }, // mapping as MappingWithInputs, // capabilityConfig // ); // Logger.getInstance().log('Mapped args', mappedArgs); // returnValue = await this.executeCapabilityMapping(mapping, args, mappedArgs, capabilityConfig); // } else { const mappedArgs = await this.performParameterMappingOnArgsIfDefined( args, mapping as MappingWithInputs, capabilityConfig ); this.logger.debug('Mapped args', mappedArgs); if (EngineConstants.prop.operationId in mapping || EngineConstants.prop.operationMapping in mapping) { returnValue = (await this.executeOperationMapping( mapping, mappedArgs, args, account!, capabilityConfig )) as NodeObject; } else if (EngineConstants.prop.series in mapping) { returnValue = await this.executeSeriesMapping(mapping as MappingWithSeries, mappedArgs, capabilityConfig); } else if (EngineConstants.prop.parallel in mapping) { returnValue = await this.executeParallelMapping(mapping as MappingWithParallel, mappedArgs, capabilityConfig); } else { returnValue = mappedArgs; } // } return await this.performReturnValueMappingWithFrameIfDefined( returnValue as JSONValue, mapping as MappingWithOutputsMapping, capabilityConfig ); } private shouldValidate(capabilityConfig?: CapabilityConfig): boolean { return capabilityConfig?.disableValidation === undefined ? this.disableValidation !== true : !capabilityConfig.disableValidation; } private async executeOperationMapping( mapping: Mapping, mappedArgs: JSONObject, originalArgs: JSONObject, account: Entity, capabilityConfig?: CapabilityConfig ): Promise> { const integration = (mapping as CapabilityMapping)[EngineConstants.prop.integration]?.[PROP_ENTITY_ID]; // If the mapping has an integration, it means that the operation is an integration operation if (integration) { const operationInfo = await this.performOperationMappingWithArgs(originalArgs, mapping, capabilityConfig); const response = await this.performOperation(operationInfo, mappedArgs, originalArgs, account, capabilityConfig); if (!this.ifCapabilityStreaming(capabilityConfig)) { this.logger.debug('Original response', JSON.stringify(response)); } return response; } // If the mapping does not have an integration, it means that the operation is a capability operation return await this.executeCapabilityMapping(mapping, originalArgs, mappedArgs, capabilityConfig); } private async executeSeriesMapping( mapping: MappingWithSeries, args: JSONObject, capabilityConfig?: CapabilityConfig ): Promise> { const seriesCapabilityMappingsList = this.rdfListToArray(mapping[EngineConstants.prop.series]!); const seriesCapabilityArgs = { originalCapabilityParameters: args, previousCapabilityReturnValue: {}, allStepsResults: [] }; return await this.executeSeriesFromList(seriesCapabilityMappingsList, seriesCapabilityArgs, capabilityConfig); } private rdfListToArray(list: { [RML_LIST]: CapabilityMapping[] } | RdfList): CapabilityMapping[] { if (!(RML_LIST in list)) { return [ list[RDF.first], ...getIdFromNodeObjectIfDefined(list[RDF.rest] as ReferenceNodeObject) === RDF.nil ? [] : this.rdfListToArray(list[RDF.rest] as RdfList) ]; } return list[RML_LIST]; } private async executeSeriesFromList( list: Mapping[], args: SeriesCapabilityArgs, capabilityConfig?: CapabilityConfig ): Promise> { const nextCapabilityMapping = list[0]; const returnValue = await this.executeMapping(nextCapabilityMapping, args, capabilityConfig); if (list.length > 1) { return await this.executeSeriesFromList( list.slice(1), { ...args, previousCapabilityReturnValue: returnValue as JSONObject, allStepsResults: [...args.allStepsResults ?? [], returnValue as JSONObject] }, capabilityConfig ); } return returnValue; } private async executeCapabilityMapping( capabilityMapping: Mapping, originalArgs: JSONObject, mappedArgs: JSONObject, capabilityConfig?: CapabilityConfig ): Promise> { const capabilityId = await this.performCapabilityMappingWithArgs(originalArgs, capabilityMapping, capabilityConfig); if (capabilityId) { if (capabilityId === EngineConstants.dataSource.update) { await this.updateEntityFromcapabilityArgs(mappedArgs); return {}; } if (capabilityId === EngineConstants.dataSource.save) { return await this.saveEntityOrEntitiesFromcapabilityArgs(mappedArgs); } if (capabilityId === EngineConstants.dataSource.destroy) { return await this.destroyEntityOrEntitiesFromcapabilityArgs(mappedArgs); } if (capabilityId === EngineConstants.dataSource.findAll) { return await this.findAll(mappedArgs); } if (capabilityId === EngineConstants.dataSource.find) { return await this.find(mappedArgs); } if (capabilityId === EngineConstants.dataSource.count) { return await this.countAndWrapValueFromcapabilityArgs(mappedArgs); } if (capabilityId === EngineConstants.dataSource.exists) { return await this.existsAndWrapValueFromcapabilityArgs(mappedArgs); } if (capabilityId === 'https://skl.ai/capability/execute-code') { const codeBlocks = ensureArray((capabilityMapping[EngineConstants.prop.codeBlocks] as any[]) ?? []).filter( (mappingItem: any): boolean => mappingItem[PROP_ENTITY_TYPE] === EngineConstants.spec.codeBlock && this.isJavaScriptCode(getValueIfDefined(mappingItem[EngineConstants.prop.codeBody])!) ); return await this.executeCodeBlocks(codeBlocks, mappedArgs, {}); } // Check for custom capabilities if (globalCustomCapabilities.has(capabilityId)) { return await globalCustomCapabilities.execute(capabilityId, mappedArgs, this, capabilityConfig); } return await this.findAndExecuteCapability(capabilityId, mappedArgs, capabilityConfig); } return {}; } private async addPreProcessingMappingToArgs( capabilityMapping: Mapping, args: JSONObject, capabilityConfig?: CapabilityConfig ): Promise { if (EngineConstants.prop.preProcessingMapping in capabilityMapping) { const preMappingArgs = await this.performMapping( args, capabilityMapping[EngineConstants.prop.preProcessingMapping] as NodeObject, getValueIfDefined(capabilityMapping[EngineConstants.prop.preProcessingMappingFrame]), capabilityConfig ); return { ...args, preProcessedParameters: preMappingArgs as JSONObject }; } return args; } private replaceTypeAndId(entity: Record): Record { if (typeof entity !== 'object') { throw new Error('Entity is not an object'); } const clonedEntity = structuredClone(entity); if (clonedEntity[EngineConstants.prop.type]) { clonedEntity[PROP_ENTITY_TYPE] = clonedEntity[EngineConstants.prop.type]; } if (clonedEntity[EngineConstants.prop.identifier]) { clonedEntity[PROP_ENTITY_ID] = SKL_DATA_NAMESPACE + (clonedEntity[EngineConstants.prop.identifier] as string); } return clonedEntity; } private async updateEntityFromcapabilityArgs(args: Record): Promise { let ids = args.id ?? args.ids; if (!Array.isArray(ids)) { ids = [ids]; } // FIX: Temporary fix for the issue where the id always getting prefixed with the namespace ids = ids.map((id: string) => (id.startsWith('http') ? id : `${SKL_DATA_NAMESPACE}${id}`)); await this.update(ids, args.attributes); } private async saveEntityOrEntitiesFromcapabilityArgs(args: Record): Promise> { if (args.entity && typeof args.entity === 'object') { args.entity = this.replaceTypeAndId(args.entity); } if (args.entities && Array.isArray(args.entities)) { args.entities = args.entities.map(this.replaceTypeAndId); } return await this.save(args.entity ?? args.entities); } private async destroyEntityOrEntitiesFromcapabilityArgs(args: Record): Promise> { if (args.entity && typeof args.entity === 'object') { args.entity = this.replaceTypeAndId(args.entity); } if (args.entities && Array.isArray(args.entities)) { args.entities = args.entities.map(this.replaceTypeAndId); } return await this.destroy(args.entity ?? args.entities); } private async countAndWrapValueFromcapabilityArgs(args: Record): Promise { const count = await this.count(args); return { [EngineConstants.dataSource.countResult]: { [PROP_ENTITY_VALUE]: count, [PROP_ENTITY_TYPE]: XSD.integer } }; } private async existsAndWrapValueFromcapabilityArgs(args: Record): Promise { const exists = await this.exists(args); return { [EngineConstants.dataSource.existsResult]: { [PROP_ENTITY_VALUE]: exists, [PROP_ENTITY_TYPE]: XSD.boolean } }; } public async findAndExecuteCapability( capabilityId: string, args: Record, capabilityConfig?: CapabilityConfig ): Promise> { const capability = (await this.findBy({ id: capabilityId })) as Capability; return await this.executeCapability(capability, args, capabilityConfig); } private async executeParallelMapping( mapping: MappingWithParallel, args: JSONObject, capabilityConfig?: CapabilityConfig ): Promise { const parallelCapabilityMappings = ensureArray( mapping[EngineConstants.prop.parallel] as unknown as OrArray ); const nestedReturnValues = await Promise.all>>( parallelCapabilityMappings.map( (capabilityMapping): Promise> => this.executeMapping(capabilityMapping, args, capabilityConfig) ) ); return nestedReturnValues.flat(); } private async findCapabilityIntegrationMapping( capabilityId: string, integratedProductId: string ): Promise { return (await this.findByIfExists({ type: EngineConstants.spec.capabilityMapping, [EngineConstants.prop.capability]: capabilityId, [EngineConstants.prop.integration]: integratedProductId })) as CapabilityMapping; } private async performOperationMappingWithArgs( args: JSONValue, mapping: Mapping, capabilityConfig?: CapabilityConfig ): Promise { if (mapping[EngineConstants.prop.operationId]) { return { [EngineConstants.prop.operationId]: mapping[EngineConstants.prop.operationId] }; } if (mapping[EngineConstants.prop.dataSource]) { return { [EngineConstants.prop.dataSource]: mapping[EngineConstants.prop.dataSource] }; } return await this.performMapping( args, mapping[EngineConstants.prop.operationMapping] as OrArray, undefined, capabilityConfig ); } private async performOperation( operationInfo: NodeObject, operationArgs: JSONObject, originalArgs: JSONObject, account: Entity, capabilityConfig?: CapabilityConfig, securityCredentials?: Entity ): Promise { if (operationInfo[EngineConstants.prop.schemeName]) { return await this.performOauthSecuritySchemeStageWithCredentials( operationInfo, operationArgs, account, securityCredentials ); } if (operationInfo[EngineConstants.prop.dataSource]) { return await this.getDataFromDataSource( getIdFromNodeObjectIfDefined(operationInfo[EngineConstants.prop.dataSource] as string | ReferenceNodeObject)!, capabilityConfig ); } if (operationInfo[EngineConstants.prop.operationId]) { const response = await this.performOpenapiOperationWithCredentials( getValueIfDefined(operationInfo[EngineConstants.prop.operationId])!, operationArgs, account, capabilityConfig ); return this.axiosResponseAndParamsToOperationResponse(response, operationArgs, originalArgs); } throw new Error('Operation not supported.'); } private axiosResponseAndParamsToOperationResponse( response: AxiosResponse, operationParameters: JSONObject, originalArgs: JSONObject ): OperationResponse { return { operationParameters, originalCapabilityParameters: originalArgs, data: response.data, status: response.status, statusText: response.statusText, headers: response.headers, config: { headers: response.config.headers, method: response.config.method, url: response.config.url, data: response.config.data } as JSONObject }; } private async performReturnValueMappingWithFrameIfDefined( returnValue: JSONValue, mapping: MappingWithOutputsMapping, capabilityConfig?: CapabilityConfig ): Promise { if (EngineConstants.prop.outputsMapping in mapping) { return await this.performMapping( returnValue, mapping[EngineConstants.prop.outputsMapping], getValueIfDefined(mapping[EngineConstants.prop.outputsMappingFrame]), capabilityConfig ); } return returnValue as NodeObject; } private async performParameterMappingOnArgsIfDefined( args: JSONObject, mapping: Partial | Partial, capabilityConfig?: CapabilityConfig, convertToJsonDeep = false ): Promise> { if (EngineConstants.prop.inputsReference in mapping) { const reference = getValueIfDefined(mapping[EngineConstants.prop.inputsReference])!; return this.getDataAtReference(reference, args); } if (EngineConstants.prop.inputsMappingRef in mapping) { const reference = getValueIfDefined(mapping[EngineConstants.prop.inputsMappingRef])!; const referencedMapping = this.getDataAtReference(reference, args); if (!referencedMapping || referencedMapping?.length === 0) { return args; } // Handle inputsMappingFrameRef if present let frame; if (EngineConstants.prop.inputsMappingFrameRef in mapping) { const frameReference = getValueIfDefined(mapping[EngineConstants.prop.inputsMappingFrameRef])!; frame = this.getDataAtReference(frameReference, args); } else { // Use direct frame if provided frame = getValueIfDefined(mapping[EngineConstants.prop.inputsMappingFrame]); } // Perform mapping with the referenced mapping and frame const mappedData = await this.performMapping(args, referencedMapping, frame, capabilityConfig); return toJSON(mappedData, convertToJsonDeep); } if (EngineConstants.prop.inputsMapping in mapping) { const mappedData = await this.performMapping( args, (mapping as MappingWithInputs)[EngineConstants.prop.inputsMapping]!, getValueIfDefined(mapping[EngineConstants.prop.inputsMappingFrame]), capabilityConfig ); return toJSON(mappedData, convertToJsonDeep); } return args; } private getDataAtReference(reference: string, data: JSONObject): any { const results = JSONPath({ path: reference, json: data, resultType: 'value' }); const isArrayOfLengthOne = Array.isArray(results) && results.length === 1; let result = isArrayOfLengthOne ? results[0] : results; if (result && typeof result === 'object' && PROP_ENTITY_VALUE in result) { result = result[PROP_ENTITY_VALUE]; } return result; } private async getIntegrationInterface( integratedProductId: string, integrationType = EngineConstants.spec.integrationInterface ): Promise { if (integrationType === EngineConstants.spec.integrationInterface) { const integrationInterface = await this.findBy({ type: integrationType, [EngineConstants.prop.type]: EngineConstants.spec.openApi, [EngineConstants.prop.integration]: integratedProductId }); return integrationInterface; } // Add support for other integration types return null; } private async findSecurityCredentialsForAccountIfDefined(accountId: string): Promise { return await this.findByIfExists({ type: EngineConstants.spec.integrationAuthenticationCredential, [EngineConstants.prop.accountOrUser]: accountId }); } private async findgetOpenApiRuntimeAuthorizationCapabilityIfDefined(): Promise { return (await this.findByIfExists({ type: EngineConstants.spec.capability, [EngineConstants.prop.label]: OPEN_API_RUNTIME_AUTHORIZATION })) as Capability; } private async findRefreshCapabilityForIntegration(integrationId: string): Promise { const integration = await this.findByIfExists({ id: integrationId }); const configuredRefreshCapability = integration?.[EngineConstants.prop.refreshCapability]; if (configuredRefreshCapability) { const refreshCapabilityId = getIdFromNodeObjectIfDefined( configuredRefreshCapability as string | ReferenceNodeObject ); if (!refreshCapabilityId) { throw new Error(`Integration ${integrationId} has an invalid refresh capability reference.`); } const refreshCapability = (await this.findByIfExists({ id: refreshCapabilityId })) as Capability | undefined; if (!refreshCapability) { throw new Error( `Refresh capability ${refreshCapabilityId} configured for integration ${integrationId} was not found.` ); } return refreshCapability; } return await this.findCapabilityWithName('getOauthTokens'); } private async getRuntimeCredentialsWithSecurityCredentials( securityCredentials: Entity, integrationId: string, openApiOperationInformation: OperationWithPathInfo, operationArgs: JSONObject ): Promise { const getOpenApiRuntimeAuthorizationCapability = await this.findgetOpenApiRuntimeAuthorizationCapabilityIfDefined(); if (!getOpenApiRuntimeAuthorizationCapability) { return {}; } const mapping = await this.findCapabilityIntegrationMapping( getOpenApiRuntimeAuthorizationCapability[PROP_ENTITY_ID], integrationId ); if (!mapping) { return {}; } const args = { securityCredentials, openApiExecutorOperationWithPathInfo: openApiOperationInformation, operationArgs } as JSONObject; const operationInfoJsonLd = await this.performParameterMappingOnArgsIfDefined(args, mapping, undefined, true); const headers = getValueIfDefined(operationInfoJsonLd[EngineConstants.prop.headers]); return headers ?? {}; } private async createOpenApiOperationExecutorWithSpec(openApiDescription: OpenApi): Promise { const executor = new OpenApiOperationExecutor(); await executor.setOpenapiSpec(openApiDescription); return executor; } private async findCapabilityObjectMapping(capabilityId: string, object: string): Promise { return (await this.findByIfExists({ type: EngineConstants.spec.capabilityMapping, [EngineConstants.prop.capability]: capabilityId, [EngineConstants.prop.object]: InversePath({ subPath: ZeroOrMorePath({ subPath: RDFS.subClassOf as string }), value: object }) })) as CapabilityMapping; } private async performCapabilityMappingWithArgs( args: JSONValue, mapping: Mapping, capabilityConfig?: CapabilityConfig ): Promise { if (mapping[EngineConstants.prop.operationId]) { return getValueIfDefined(mapping[EngineConstants.prop.operationId])!; } if (mapping[EngineConstants.prop.operationId]) { return getValueIfDefined(mapping[EngineConstants.prop.operationId])!; } const capabilityInfoJsonLd = await this.performMapping( args, mapping[EngineConstants.prop.operationMapping] as NodeObject, undefined, capabilityConfig ); return getValueIfDefined(capabilityInfoJsonLd[EngineConstants.prop.operationId])!; } private async assertCapabilityParamsMatchParameterSchemas( capabilityParams: any, capability: Capability ): Promise { let parametersSchemaObject = capability[EngineConstants.prop.inputs]; if (parametersSchemaObject?.[PROP_ENTITY_ID] && Object.keys(parametersSchemaObject).length === 1) { parametersSchemaObject = await this.findBy({ id: parametersSchemaObject[PROP_ENTITY_ID] }); } if (capabilityParams && parametersSchemaObject) { const capabilityParamsAsJsonLd = { '@context': getValueIfDefined(capability[EngineConstants.prop.inputsContext]), [PROP_ENTITY_TYPE]: EngineConstants.spec.inputs, ...capabilityParams }; const report = await this.convertToQuadsAndValidateAgainstShape(capabilityParamsAsJsonLd, parametersSchemaObject); if (!report.conforms) { this.throwValidationReportError( report, `${getValueIfDefined(capability[EngineConstants.prop.label])} parameters do not conform to the schema` ); } } } private async performOpenapiOperationWithCredentials( operationId: string, operationArgs: JSONObject, account: Entity, capabilityConfig?: CapabilityConfig ): Promise { const integratedProductId = (account[EngineConstants.prop.integration] as ReferenceNodeObject)[PROP_ENTITY_ID]; const restAPIInterface = await this.getIntegrationInterface(integratedProductId); if (!restAPIInterface) { throw new Error(`No integration interface found for integrated product ${integratedProductId}`); } const openApiDescription = getValueIfDefined( restAPIInterface[EngineConstants.prop.declarativeApiDescription] )!; const openApiExecutor = await this.createOpenApiOperationExecutorWithSpec(openApiDescription); // eslint-disable-next-line @typescript-eslint/await-thenable const openApiOperationInformation = await openApiExecutor.getOperationWithPathInfoMatchingOperationId(operationId); const securityCredentials = await this.findSecurityCredentialsForAccountIfDefined(account[PROP_ENTITY_ID]); let runtimeAuthorization: JSONObject = {}; if (securityCredentials) { const generatedRuntimeCredentials = await this.getRuntimeCredentialsWithSecurityCredentials( securityCredentials, integratedProductId, openApiOperationInformation, operationArgs ); if (generatedRuntimeCredentials && Object.keys(generatedRuntimeCredentials).length > 0) { runtimeAuthorization = generatedRuntimeCredentials; } } const apiKey = [ getValueIfDefined(securityCredentials?.[EngineConstants.prop.apiKey]), this.getAuthorizationHeaderFromRuntimeCredentials(runtimeAuthorization) ].find(Boolean); const configuration = { accessToken: getValueIfDefined(securityCredentials?.[EngineConstants.prop.accessToken]), bearerToken: getValueIfDefined(securityCredentials?.[EngineConstants.prop.bearerToken]), apiKey, basePath: getValueIfDefined(account[EngineConstants.prop.overrideBasePath]), username: getValueIfDefined(securityCredentials?.[EngineConstants.prop.username]), password: getValueIfDefined(securityCredentials?.[EngineConstants.prop.password]) }; let response; let executeOperationOptions: AxiosRequestConfig | undefined; try { const additionalHeaders = this.getHeadersFromRuntimeCredentials(runtimeAuthorization) as any; if ( additionalHeaders && typeof additionalHeaders === 'object' && !Array.isArray(additionalHeaders) && Object.keys(additionalHeaders).length > 0 ) { executeOperationOptions = { headers: additionalHeaders }; } if (this.ifCapabilityStreaming(capabilityConfig)) { executeOperationOptions = { ...executeOperationOptions, responseType: 'stream' }; } else if (this.ifCapabilityBuffering(capabilityConfig)) { executeOperationOptions = { ...executeOperationOptions, responseType: 'arraybuffer' }; } if (capabilityConfig?.axiosOptions && typeof capabilityConfig.axiosOptions === 'object' && Object.keys(capabilityConfig.axiosOptions).length > 0) { executeOperationOptions = { ...executeOperationOptions, ...capabilityConfig.axiosOptions }; } response = await openApiExecutor.executeOperation( operationId, configuration, operationArgs, executeOperationOptions ); } catch (error: unknown) { if ( axios.isAxiosError(error) && await this.isInvalidTokenError(error, integratedProductId) && securityCredentials ) { const refreshCapabilityConfig = capabilityConfig ? { ...capabilityConfig, stream: false, buffer: false } : undefined; const refreshedConfiguration = await this.refreshSecurityCredentials( securityCredentials, integratedProductId, account, refreshCapabilityConfig ); response = await openApiExecutor.executeOperation( operationId, refreshedConfiguration, operationArgs, executeOperationOptions ); } else { throw error; } } return response; } private getHeadersFromRuntimeCredentials(runtimeCredentials: JSONObject): JSONObject { let returnValue: JSONObject = {}; if ( runtimeCredentials.headers && typeof runtimeCredentials.headers === 'object' && Object.keys(runtimeCredentials.headers).length > 0 && !Array.isArray(runtimeCredentials.headers) ) { returnValue = runtimeCredentials.headers; } return returnValue; } private getAuthorizationHeaderFromRuntimeCredentials(runtimeCredentials: JSONObject): string | undefined { const headers = this.getHeadersFromRuntimeCredentials(runtimeCredentials); if (headers && 'Authorization' in headers) { const authorizationHeader = headers.Authorization; if (typeof authorizationHeader === 'string') { return authorizationHeader; } } return undefined; } private async isInvalidTokenError(error: AxiosError, integratedProductId: string): Promise { const integrationInterface = await this.getIntegrationInterface(integratedProductId); if (!integrationInterface) { return false; } const errorMatcher = integrationInterface[EngineConstants.prop.invalidTokenErrorMatcher] as NodeObject; const errorMatcherStatus = errorMatcher && getValueIfDefined(errorMatcher[EngineConstants.prop.invalidTokenErrorMatcherStatus]); const errorMatcherRegex = errorMatcher && getValueIfDefined(errorMatcher[EngineConstants.prop.invalidTokenErrorMatcherMessageRegex])!; if (errorMatcher && error.response?.status === errorMatcherStatus) { if (!errorMatcherRegex) { return true; } if (error.response?.statusText && new RegExp(errorMatcherRegex, 'u').test(error.response?.statusText)) { return true; } } return false; } private async refreshSecurityCredentials( securityCredentials: Entity, integrationId: string, account: Entity, capabilityConfig?: CapabilityConfig ): Promise { const refreshCapability = await this.findRefreshCapabilityForIntegration(integrationId); const mapping = await this.findCapabilityIntegrationMapping(refreshCapability[PROP_ENTITY_ID], integrationId); if (!mapping) { throw new Error( `No mapping found for capability ${refreshCapability[PROP_ENTITY_ID]} and integration ${integrationId}` ); } const args = { refreshToken: getValueIfDefined(securityCredentials[EngineConstants.prop.refreshToken])!, jwtBearerOptions: getValueIfDefined(securityCredentials[EngineConstants.prop.jwtBearerOptions])!, securityCredentials: securityCredentials as unknown as JSONValue, account: account as unknown as JSONValue, integrationId } as JSONObject; const operationArgs = await this.performParameterMappingOnArgsIfDefined(args, mapping, capabilityConfig, true); const operationInfoJsonLd = await this.performOperationMappingWithArgs({}, mapping, capabilityConfig); const rawReturnValue = await this.performOperation( operationInfoJsonLd, operationArgs, args, account, capabilityConfig, securityCredentials ); const mappedReturnValue = await this.performReturnValueMappingWithFrameIfDefined( rawReturnValue, mapping as MappingWithOutputsMapping, capabilityConfig ); await this.assertCapabilityReturnValueMatchesReturnTypeSchema(mappedReturnValue, refreshCapability); const bearerToken = getValueIfDefined(mappedReturnValue[EngineConstants.prop.bearerToken]); const accessToken = getValueIfDefined(mappedReturnValue[EngineConstants.prop.accessToken]); const refreshToken = getValueIfDefined(mappedReturnValue[EngineConstants.prop.refreshToken]); if (bearerToken) { securityCredentials[EngineConstants.prop.bearerToken] = bearerToken; } if (accessToken) { securityCredentials[EngineConstants.prop.accessToken] = accessToken; } if (refreshToken) { securityCredentials[EngineConstants.prop.refreshToken] = refreshToken; } await this.save(securityCredentials); return { accessToken, bearerToken }; } private getOauthConfigurationFromSecurityCredentials(securityCredentialsSchema: Entity): OpenApiClientConfiguration { const username = getValueIfDefined(securityCredentialsSchema[EngineConstants.prop.username]); const password = getValueIfDefined(securityCredentialsSchema[EngineConstants.prop.password]); const accessToken = getValueIfDefined(securityCredentialsSchema[EngineConstants.prop.accessToken]); return { username, password, accessToken }; } private async assertCapabilityReturnValueMatchesReturnTypeSchema( returnValue: OrArray, capability: Capability ): Promise { let returnTypeSchemaObject = capability[EngineConstants.prop.outputs]; if (returnTypeSchemaObject?.[PROP_ENTITY_ID] && Object.keys(returnTypeSchemaObject).length === 1) { returnTypeSchemaObject = await this.findBy({ id: returnTypeSchemaObject[PROP_ENTITY_ID] }); } let report: ValidationReport | undefined; if (returnValue && returnTypeSchemaObject) { if (Array.isArray(returnValue)) { if (returnValue.some((valueItem): boolean => PROP_ENTITY_ID in valueItem)) { returnTypeSchemaObject[SHACL.targetNode] = returnValue.reduce( (nodes: ReferenceNodeObject[], outputItem): ReferenceNodeObject[] => { if (outputItem[PROP_ENTITY_ID]) { nodes.push({ [PROP_ENTITY_ID]: outputItem[PROP_ENTITY_ID] }); } return nodes; }, [] ); } else { const targetClasses = returnValue.reduce( (nodes: ReferenceNodeObject[], outputItem): ReferenceNodeObject[] => { if (outputItem[PROP_ENTITY_TYPE]) { const type = Array.isArray(outputItem[PROP_ENTITY_TYPE]) ? outputItem[PROP_ENTITY_TYPE][0] : outputItem[PROP_ENTITY_TYPE]; if (!nodes.includes({ [PROP_ENTITY_ID]: type })) { nodes.push({ [PROP_ENTITY_ID]: type }); } } return nodes; }, [] ); if (targetClasses.length > 0) { returnTypeSchemaObject[SHACL.targetClass] = targetClasses; } } report = await this.convertToQuadsAndValidateAgainstShape(returnValue, returnTypeSchemaObject); } else if (Object.keys(returnValue).length > 0) { if (returnValue[PROP_ENTITY_ID]) { returnTypeSchemaObject[SHACL.targetNode] = { [PROP_ENTITY_ID]: returnValue[PROP_ENTITY_ID] }; } else if (returnValue[PROP_ENTITY_TYPE]) { returnTypeSchemaObject[SHACL.targetClass] = { [PROP_ENTITY_ID]: Array.isArray(returnValue[PROP_ENTITY_TYPE]) ? returnValue[PROP_ENTITY_TYPE][0] : returnValue[PROP_ENTITY_TYPE]! }; } report = await this.convertToQuadsAndValidateAgainstShape(returnValue, returnTypeSchemaObject); } } if (report && !report?.conforms) { throw new Error( `Return value ${ Array.isArray(returnValue) ? 'array' : returnValue[PROP_ENTITY_ID] } does not conform to the schema` ); } } private async convertToQuadsAndValidateAgainstShape( value: OrArray, shape: NodeObject ): Promise { const valueAsQuads = await convertJsonLdToQuads(Array.isArray(value) ? value : [value]); const shapeQuads = await convertJsonLdToQuads(shape); const validator = new SHACLValidator(shapeQuads); return validator.validate(valueAsQuads); } private async performOauthSecuritySchemeStageWithCredentials( operationInfo: NodeObject, operationParameters: JSONObject, account: Entity, securityCredentials?: Entity ): Promise { const integratedProductId = (account[EngineConstants.prop.integration] as ReferenceNodeObject)[PROP_ENTITY_ID]; const restAPIInterface = await this.getIntegrationInterface(integratedProductId); if (!restAPIInterface) { throw new Error(`No integration interface found for integrated product ${integratedProductId}`); } const openApiDescription = getValueIfDefined( restAPIInterface[EngineConstants.prop.declarativeApiDescription] )!; securityCredentials ||= await this.findSecurityCredentialsForAccountIfDefined(account[PROP_ENTITY_ID]); let configuration: OpenApiClientConfiguration; if (securityCredentials) { configuration = this.getOauthConfigurationFromSecurityCredentials(securityCredentials); operationParameters.client_id = getValueIfDefined(securityCredentials[EngineConstants.prop.username])!; } else { configuration = {}; } const openApiExecutor = await this.createOpenApiOperationExecutorWithSpec(openApiDescription); const response = await openApiExecutor.executeSecuritySchemeStage( getValueIfDefined(operationInfo[EngineConstants.prop.schemeName])!, getValueIfDefined(operationInfo[EngineConstants.prop.oauthFlow])!, getValueIfDefined(operationInfo[EngineConstants.prop.stage])!, configuration, operationParameters ); if ('codeVerifier' in response && 'authorizationUrl' in response) { return { data: response as unknown as JSONObject, operationParameters }; } return this.axiosResponseAndParamsToOperationResponse(response, operationParameters, operationParameters); } private async getDataFromDataSource( dataSourceId: string, capabilityConfig?: CapabilityConfig ): Promise { const dataSource = await this.findBy({ id: dataSourceId }); if (dataSource[PROP_ENTITY_TYPE] === EngineConstants.spec.jsonDataSource) { const data = this.getDataFromJsonDataSource(dataSource, capabilityConfig); return { data, operationParameters: {}}; } throw new Error(`DataSource type ${dataSource[PROP_ENTITY_TYPE]} is not supported.`); } private getDataFromJsonDataSource(dataSource: NodeObject, capabilityConfig?: CapabilityConfig): JSONObject { if (dataSource[EngineConstants.prop.source]) { const sourceValue = getValueIfDefined(dataSource[EngineConstants.prop.source])!; return this.getJsonDataFromSource(sourceValue, capabilityConfig); } return getValueIfDefined(dataSource[EngineConstants.prop.data])!; } private getJsonDataFromSource(source: string, capabilityConfig?: CapabilityConfig): JSONObject { const inputFiles = { ...this.inputFiles, ...capabilityConfig?.inputFiles }; if (source in inputFiles) { const file = inputFiles[source]; if (typeof file === 'string') { return JSON.parse(file); } return file; } // eslint-disable-next-line unicorn/expiring-todo-comments // TODO add support for remote sources throw new Error(`Failed to get data from source ${source}`); } private throwValidationReportError(report: ValidationReport, errorMessage: string): void { const reportMessages = this.validationReportToMessages(report); throw new Error(`${errorMessage}\n\n${reportMessages.join('\n')}`); } private validationReportToMessages(report: ValidationReport): string[] { const reportMessages = []; for (const result of report.results) { const pathValue = result.path?.value; if (result.message.length === 0) { const message = `${pathValue}: Invalid due to ${result.sourceConstraintComponent?.value}`; reportMessages.push(message); } else { const resultMessages = result.message.map((message): string => `${message.value}`).join(', '); const message = `${pathValue}: ${resultMessages}`; reportMessages.push(message); } } return reportMessages; } private ifCapabilityStreaming(capabilityConfig?: CapabilityConfig): boolean { return Boolean(capabilityConfig && 'stream' in capabilityConfig && capabilityConfig.stream); } private ifCapabilityBuffering(capabilityConfig?: CapabilityConfig): boolean { return Boolean(capabilityConfig && 'buffer' in capabilityConfig && capabilityConfig.buffer); } private isJavaScriptCode(text: string): boolean { if (!text) { return false; } // Trim the text to handle leading/trailing whitespace const trimmed = text.trim(); // First, quickly check if it looks like JSON (starts with { or [) const looksLikeJson = /^\s*[[{]/.test(trimmed) && /[\]}]\s*$/.test(trimmed); if (looksLikeJson) { try { // If it can be parsed as JSON, it's likely an RML mapping JSON.parse(trimmed); return false; } catch { // If it looks like JSON but can't be parsed, it might be code // Continue with other checks } } // Check for common JavaScript patterns that wouldn't appear in JSON const jsPatterns = [ /function\s+[$A-Z_a-z][\w$]*\s*\(/, // Function declaration /const\s+[$A-Z_a-z][\w$]*/, // Const declaration /let\s+[$A-Z_a-z][\w$]*/, // Let declaration /var\s+[$A-Z_a-z][\w$]*/, // Var declaration /if\s*\(/, // If statement /for\s*\(/, // For loop /while\s*\(/, // While loop /=>|return |console\./, // Arrow functions, return statements, console usage /import\s+|export\s+/, // ES6 imports/exports /class\s+[$A-Z_a-z][\w$]*/, // Class declaration /\/\/|\/\*|\*\// // Comments ]; // If it matches any JS pattern, it's likely code return jsPatterns.some(pattern => pattern.test(trimmed)); } private ensureNodeObject(result: any): NodeObject { // Handle null/undefined results if (result === null || result === undefined) { return {}; } // If it's already a NodeObject with proper formatting, return as is if ( typeof result === 'object' && !Array.isArray(result) && (result[PROP_ENTITY_ID] !== undefined || result[PROP_ENTITY_TYPE] !== undefined || result['@context'] !== undefined || result['@graph'] !== undefined) ) { return result as NodeObject; } // Handle arrays - convert to @graph if (Array.isArray(result)) { // Check if items in array are already objects const convertedItems = result.map(item => { if (typeof item === 'object' && item !== null) { // Already object, but ensure it has @type if not present if (!item[PROP_ENTITY_TYPE] && !item[PROP_ENTITY_ID] && !item[PROP_ENTITY_VALUE]) { return { [PROP_ENTITY_TYPE]: 'skl:Result', ...item }; } return item; } // Convert primitives to objects with @value return { [PROP_ENTITY_VALUE]: item }; }); return { '@context': { skl: SKL_NAMESPACE }, '@graph': convertedItems }; } // Handle primitive values (string, number, boolean) if (typeof result !== 'object') { return { [PROP_ENTITY_VALUE]: result }; } // Handle plain objects (not arrays, not null) // If it's a plain object but not in NodeObject format, add @type return { [PROP_ENTITY_TYPE]: 'skl:Result', '@context': { skl: SKL_NAMESPACE }, ...result }; } private async executeCodeBlocks( codeBlocks: NodeObject[], args: JSONValue, executionOptions: ExecutionOptions ): Promise { const results = []; let updatedArgs = (args as Record) || {}; for (const codeBlock of codeBlocks) { const code = getValueIfDefined(codeBlock[EngineConstants.prop.codeBody])!; if (code) { const result = await this.executeJavaScriptMapping(updatedArgs, code as string, executionOptions); updatedArgs = { ...updatedArgs, ...result }; results.push(result); } } return results.length > 0 ? results[results.length - 1] : {}; } private async executeJavaScriptMapping( args: JSONValue, code: string, _executionOptions: ExecutionOptions ): Promise { try { // Configure execution options const executionOptions: ExecutionOptions = { timeout: 60 * 60 * 1000, // 1 hour functionName: 'main', allowNetwork: true, allowedDomains: [], allowEnv: true, debugMode: true, allowRead: true, ..._executionOptions }; if (!this.codeExecutor) { throw new Error('Code executor not set. Please set a code executor using setCodeExecutor()'); } // Execute the code const executionResult = await this.codeExecutor.execute(code, args as Record, executionOptions); // Handle execution errors if (executionResult?.error) { throw new Error(`Code execution error: ${executionResult.error?.message ?? 'Unknown error'}`); } // This is a patch to return the output as it is without converting to NodeObject // This is needed for some api inputs. We can try doing this using the frame. if (executionResult?.result?.rawOutput) { return executionResult?.result?.rawOutput; } // Convert the result to a NodeObject return this.ensureNodeObject(executionResult?.result ?? executionResult); } catch (error: unknown) { throw new Error( `Failed to execute JavaScript mapping: ${error instanceof Error ? error.message : String(error)}` ); } } // Expose global hooks to users // eslint-disable-next-line @typescript-eslint/explicit-member-accessibility get hooks(): typeof globalHooks { return globalHooks; } public async prepareContextForType(typeSpecOrId: any): Promise> { const typeSpecs = []; if (typeof typeSpecOrId === 'string') { typeSpecs.push( await this.getSuperClassesOfObject(typeSpecOrId), await this.findBy({ id: typeSpecOrId }) ); } else { typeSpecs.push(typeSpecOrId); } const context: Record = {}; for (const typeSpec of typeSpecs) { const properties = typeSpec[SHACL.property] ?? []; for (const property of ensureArray(properties)) { const dataType = property[SHACL.datatype]?.['@id']; const nodeKind = property[SHACL.nodeKind]?.['@id']; const path = property[SHACL.path]?.['@id']; if (nodeKind === SHACL.IRI) { context[path] = { '@type': '@id' }; } else { context[path] = { '@type': dataType }; } } } return context; } }