/*-------------------------------------------------------------------------------------------------------------- * Copyright (c) insite-gmbh. All rights reserved. * Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------------------------*/ import { Subject } from "rxjs/Subject"; import { Observable } from 'rxjs/Rx'; import { DataChangeEvent, IPlcEventProxy } from "../domain"; //================================================================================================================================== /** * This stores the Subscriber information and the subject to execute the callback on the subscriber * @class PlcSubscriber * @author insite-gmbh */ class PlcEventProxy implements IPlcEventProxy { /** * */ constructor(private _cleanup: (instance: IPlcEventProxy) => void) { if (_cleanup == null) throw "cleanup delegate is null!" } /** * This is the plc group. All Varibales in this group are red at the same cycle and the same connection. * * @property Group * @type {string} */ Group: string; /** * This is the mapping where the variables are specified. * * @property Mapping * @type {string} */ Mapping: string; /** * This are a comma separated list of active variables * * @property Variables * @type {string} */ Variables: string; /** * Signals if the variable is an absolute address or a symbolic varname * * @property Absolute * @type {boolean} */ Absolute: boolean; /** * This is the subject for the callbacks. * * @property Subject * @type {string} */ Subject: Subject; /** * You can call subscribe on this property to get informed for updates. * * @property ChangeEvent * @type {Observable} */ get ChangeEvent(): Observable { return this.Subject.asObservable(); } /** * unregister the variable on inax */ dispose() { if (this._cleanup != null) { this._cleanup(this); } } } //================================================================================================================================== /** * This class handles the proxy generation for the plc signalR hub * @class PlcMappingPack * @author insite-gmbh */ export class PlcEventProxyFactory { /** * This holds the list of proxies for change events * * @property _proxies * @type {Array} * @private */ private _proxies = new Array(); /** * This holds the instance of the plc hub proxy. * * @property _hubProxy * @type {any} * @private */ public constructor(private _hubProxy: any) { if (_hubProxy == null) throw "plcHubProxy is null!"; this._hubProxy.on("DataChanged", (group: string, mapping: string, data: any) => { let dce = new DataChangeEvent(); dce.Group = group; dce.Mapping = mapping; dce.Data = data; this._proxies.forEach(channelSub => { if (channelSub.Group === group && channelSub.Mapping === mapping) { channelSub.Subject.next(dce); } }); }) } /** * Create a proxy to subscribe to plc data changes of the given variables (initial datachange will be fired after subscription) * @method create * @param {string} group Group name for the callbacks, so you can register variables for this group from different places, * and every watcher of this group gets an update * @param {string} plcId This id specifies a single plc, the id have to be the same as specified in the * INAX config (with different ids you could adress different plcs or connections) * @param {string} mapping Mapping name of the Variables. Every symbolic variable has to have a Mapping specification * which specifies the area, offset,.. of the variable * @param {Array} variables Names of the symbolic/absolute variables * @param {boolean} abs Switch between Absolute and Symbolic variables * @return {IPlcEventProxy} returns a object with the registered properties to subscribe on changes. */ public create(group: string, plcId: string, mapping: string, variables: Array, abs: boolean = false): IPlcEventProxy { let vars = variables.join(","); // Now we just create our internal object so we can track this subject // in case someone else wants it too let channelSub = new PlcEventProxy((sub) => this.release(sub)); channelSub.Mapping = `${plcId}.${mapping}`; channelSub.Group = group; channelSub.Variables = vars; channelSub.Absolute = abs; channelSub.Subject = new Subject(); // save channel to subscribers, to broadcast evetns on data changes this._proxies.push(channelSub); // Now SignalR is asynchronous, so we need to ensure the connection is // established before we call any server methods. So we'll subscribe to // the starting$ stream since that won't emit a value until the connection // is ready this._hubProxy.invoke("Subscribe", channelSub.Group, channelSub.Mapping, channelSub.Variables, channelSub.Absolute) .done(() => { console.log(`Successfully subscribed to mapping ${mapping} and variable ${vars}`); }) .fail((error: any) => { channelSub.Subject.error(error); }); return channelSub; } /** * Cleanup the given proxy * @method release * @param {string} group Group name for the callbacks, so you can register variables for this group from different places, * and every watcher of this group gets an update. * @param {string} plcId This id specifies a single plc, the id have to be the same as specified in the * INAX config (with different ids you could adress different plcs or connections) . * @param {string} mapping Mapping name of the Variables. Every symbolic variable has to have a Mapping specification * which specifies the area, offset,.. of the variable. * @param {Array} variables Names of the symbolic/absolute variables. * @param {boolean} abs Switch between Absolute and Symbolic variables. * @return {Observable} returns a Observable which is called after the subscription ended. */ private release(proxy: IPlcEventProxy): void { let channelSub = proxy as PlcEventProxy; let removableVariables = this.getRemovableVariables(channelSub); if(removableVariables.length <= 0){ this.completeSubscription(channelSub); return; } // Now SignalR is asynchronous, so we need to ensure the connection is // established before we call any server methods. So we'll subscribe to // the starting$ stream since that won't emit a value until the connection // is ready this._hubProxy.invoke("Unsubscribe", channelSub.Group, channelSub.Mapping, channelSub.Variables, channelSub.Absolute) .done(() => { this.completeSubscription(channelSub); }) .fail((error: any) => { console.log(`Error usubscribed for group ${channelSub.Group}, mapping ${channelSub.Mapping} and variables ${channelSub.Variables} error wa ${error}`); channelSub.Subject.error(error); }); } /** * Remove a subscription from the event proxy */ private completeSubscription(proxy: PlcEventProxy){ proxy.Subject.complete(); let idx = this._proxies.indexOf(proxy); if (idx != -1) this._proxies.splice(idx, 1); else { console.log(`Subscriber for group ${proxy.Group}, mapping ${proxy.Mapping} and variable ${proxy.Variables} not found after unsubscribe!`); return; } console.log(`Successfully unsubscribed from group ${proxy.Group}, mapping ${proxy.Mapping} and variable ${proxy.Variables}`); } /** * Get all variable which have no other subscriber the the given proxy */ private getRemovableVariables(proxy: PlcEventProxy):string{ let vars = proxy.Variables.split(","); this._proxies.forEach(channelSub => { if (proxy != channelSub && // test not ourselve channelSub.Group === proxy.Group && channelSub.Mapping === proxy.Mapping) { // remove varibles which exits also in another subscription channelSub.Variables.split(",").forEach( variable => { let found = vars.indexOf(variable); if(found != -1){ vars.splice(found, 1); } }); } }); return vars.join(","); } }