import * as assert from "assert"; import * as _ from "lodash"; import { IChassisContext, IChassisPlugin, IChassisMiddleware, IOperation, } from "../index"; import { Request, Response, Handler } from "express"; import * as os from "os"; import { Operation } from "../openapi/"; import { compose } from "compose-middleware"; import { Utils } from "../helpers"; // import APIError from "../core/APIError"; /** * plugin to create a pipeline of middleware (including before/after policies) * * @param context * @param options * @returns {exports} */ export class PipelinePlugin implements IChassisPlugin { name = "pipeline"; title = "Inject a set of middleware into request/response flow"; context: IChassisContext; policies: any = { all: [], before: {}, after: {} }; middleware: PipelineMiddleware; install(context: IChassisContext, _options: any) { this.context = context; // global policies this.policies = _.extend({ all: {}, before: {}, after: {} }, _options); this.middleware = new PipelineMiddleware(this); context.middleware.set(this.middleware); context.log({ code: "api:pipeline:installed", plugin: this.name, configs: _.keys(this.policies), }); } } /** * generate middleware that excecutes pipeline for an operation/feature including before/after policies */ export class PipelineMiddleware implements IChassisMiddleware { name = "api.pipeline"; title = "Pipeline middleware"; constructor(protected plugin: PipelinePlugin, _name?: string) { this.name = _name || this.name; } fn(oper: IOperation, _ignored: any): Function { let pipeline = this.pipleine(oper); // doesn't support post-install changes to policies return compose(pipeline); } pipleine(oper: IOperation): Handler[] { let context: IChassisContext = this.plugin.context; // prepare the before/after pipelines let policies = _.extend( { all: {}, before: {}, after: {} }, this.plugin.policies, oper.feature ); // prioritize middleware for all pipeline stages let all: Handler[] = this.prioritize( this.policy_middleware(oper, policies.all) ); let before: Handler[] = this.prioritize( this.policy_middleware(oper, policies.before) ); let after: Handler[] = this.prioritize( this.policy_middleware(oper, policies.after) ); let names = [] .concat(_.keys(policies.all)) .concat(_.keys(policies.before)) .concat([">-<"]) .concat(_.keys(policies.after)) .concat(["=="]); // afterware hijack's the response - to inject 'after' policies let afterware_fn = this.afterware(oper, after, names); // start our pipeline let pipeline = [afterware_fn as Handler].concat(all); pipeline = pipeline.concat(before); // pipeline uses named middleware let featureId = oper.featureId; if (featureId && featureId != this.name) { let middleware = this.feature(featureId); assert(middleware, "unknown feature: " + featureId); names.push(featureId); let handler = middleware.fn(oper, oper.feature) as Handler; handler && pipeline.push(handler); } else { _.each(oper.feature, (feature: any, feature_id: string) => { if (feature_id && _.isObject(feature)) { let middleware = this.feature(feature_id); if (middleware) { names.push(feature_id); let handler = middleware.fn(oper, feature) as Handler; handler && pipeline.push(handler); } } }); } context.log({ code: "api:pipeline:created", featureId: oper.featureId, pipeline: names, length: pipeline.length, resource: oper.resource, method: oper.actionId, operationId: oper.operationId, before: _.keys(policies.before), after: _.keys(policies.after), all: _.keys(policies.all), }); return pipeline; } /** * some sugar to make finding middleware sweeter .. * * @param name */ public feature(name: string) { let context: IChassisContext = this.plugin.context; return ( context.middleware.get(name) || context.middleware.get("api." + name) || context.middleware.get("a6s." + name) ); } prioritize(policies: any): Handler[] { let sorted = _.sortBy(policies, (p) => { return p.priority || 0; }); let pipeline: Handler[] = []; _.each(sorted, (policy) => { pipeline.push(policy.fn); }); return pipeline; } /** * Turn a set of named middleware configs into a pipeline * * @param oper * @param configs */ policy_middleware(oper: IOperation, configs: any): any { let context: IChassisContext = this.plugin.context; let policies = {}; // merge plugin, middleware and operation options let i = 1; for (let _name in configs) { let _config = _.extend( { enabled: true, name: _name }, configs[_name] ); assert(_config, "missing policy: " + _name); let name = _config.name; let _feature = this.feature(name); assert(_feature, " unknown middleware: " + name); if (_config && "object" === typeof _config) { _config.priority = _config.priority = 10 * i++; let _options = _.extend({ enabled: true, fn: null }, _config); if (_options.enabled !== false) { _options.fn = _feature.fn(oper, _options); policies[name] = _options; } context.log({ code: "api:pipeline:policy", featureId: oper.featureId, pipeline: name, resource: oper.resource, }); } } return policies; } /** * Creates 'express' middleware that allows policies to be executed 'after' the response 'res.end()' is generated. * * @param oper * @param after */ afterware( oper: IOperation, after: Function[], pipeline_names?: string[] ): Function { let context: IChassisContext = this.plugin.context; return (req: Request, res: Response, next: Function) => { let now = Date.now(); let uuid = this.request_uuid(req, now); res.setHeader("X-REQUEST-ID", uuid); let self = this; let deferred_end = res.end; res.end = function () { // if (arguments.length) { // console.log("sending: %j", arguments[0]); // deferred_end.apply(res, arguments); // return; // } // prevent end() being called twice // if (this._afterware) { // context.log({ // code: "api:pipeline:afterware:cycle", // operationId: oper.operationId, // resource: oper.resource, // method: oper.actionId, // request_id: uuid // }); // throw new APIError(500, { code: "api:pipeline:cycle" }); // } this._afterware = deferred_end; // run 'after' pipeline try { after.forEach((fn) => { fn(req, res, next); }); context.log({ code: "api:pipeline:afterware:end", pipeline: pipeline_names, operationId: oper.operationId, featureId: oper.featureId, resource: oper.resource, method: oper.actionId, request_id: uuid, }); } catch (err) { // trying to modify headers after sent context.error({ code: "api:pipeline:afterware:failed", pipeline: pipeline_names, operationId: oper.operationId, featureId: oper.featureId, resource: oper.resource, method: oper.actionId, request_id: uuid, }); } // res.setHeader("X-REQUEST-DURATION", _elapsed); // finally, send response to the HTTP client deferred_end.apply(res, arguments); // proof of industry :-) if (!arguments.length) { let _elapsed = Date.now() - now; self.metrics(oper, uuid, req); context.log({ code: "api:pipeline:done", message: "" + this.statusMessage + " " + this.statusCode, operationId: oper.operationId, actionId: oper.actionId, request_id: uuid, path: req.path, resource: oper.resource, elapsed: _elapsed, }); } }; next(); }; } /** * basic request tracking accounting * * @param oper * @param request_uuid * @param req */ metrics(oper: IOperation, request_id: string, req: Request) { let context: IChassisContext = this.plugin.context; let count_responses = "count_response_" + oper.actionId + "_" + oper.operationId; let metric_labels = this.metric_labels(oper, req, request_id); // instrument and audit if (context.metrics) { context.metrics.metric(count_responses, "Counter", metric_labels); context.metrics.count(count_responses, 1, metric_labels); } } /** * generate a UUID for each request * * @param req * @param now */ request_uuid(req: Request, now: number) { let seed = os.hostname() + "_" + now; let ipv4 = req.headers["x-forwarded-for"] || req.connection.remoteAddress; return Utils.uuid(seed + ipv4 + "_" + now); } /** * Prometheus metric labels for an request / operation * * @param oper * @param req * @param request_uuid */ metric_labels(oper: IOperation, req: Request, request_id: string) { return { method: req.method, path: req.path, request_id: request_id, resource: oper.resource, operationId: oper.operationId, actionId: oper.actionId, }; } /** * inject trace into pipeline * * @param stage * @param _pipeline */ tracing( oper: Operation, stage: string, _pipeline: Function[], policyNames: [] ) { let context: IChassisContext = this.plugin.context; _pipeline.push(function (req: Request, res: Response, next: Function) { assert(req && res, "invalid request"); let _uuid = res.getHeader("X-REQUEST-ID"); context.log({ code: "api:pipeline:trace:" + stage, operationId: oper.operationId, policies: policyNames, uuid: _uuid, }); next(); }); } }