import {EventEmitter} from 'events';
import * as util from 'util';
import {getLogger} from 'pinusmod-logger';
import * as utils from '../../util/utils';
import {SID, FRONTENDID, UID} from '../../util/constants';
import {ISocket} from '../../interfaces/ISocket';
import * as path from 'path';
let logger = getLogger('pinus', path.basename(__filename));
let FRONTEND_SESSION_FIELDS = ['id', 'frontendId', 'uid', '__sessionService__'];
let EXPORTED_SESSION_FIELDS = ['id', 'frontendId', 'uid', 'settings'];
let ST_INITED = 0;
let ST_CLOSED = 1;
export interface SessionServiceOptions {
singleSession?: boolean;
}
/**
* Session service maintains the internal session for each client connection.
*
* Session service is created by session component and is only
* available in frontend servers. You can access the service by
* `app.get('sessionService')` or `app.sessionService` in frontend servers.
*
* @param {Object} opts constructor parameters
* @class
* @constructor
*/
export class SessionService {
singleSession: boolean;
sessions: { [sid: number]: Session };
uidMap: { [uid: string]: Session[] };
constructor(opts ?: SessionServiceOptions) {
opts = opts || {};
this.singleSession = opts.singleSession;
this.sessions = {}; // sid -> session
this.uidMap = {}; // uid -> sessions
}
/**
* Create and return internal session.
*
* @param {Integer} sid uniqe id for the internal session
* @param {String} frontendId frontend server in which the internal session is created
* @param {Object} socket the underlying socket would be held by the internal session
*
* @return {Session}
*
* @memberOf SessionService
* @api private
*/
create(sid: SID, frontendId: FRONTENDID, socket: ISocket) {
let session = new Session(sid, frontendId, socket, this);
this.sessions[session.id] = session;
return session;
}
/**
* Bind the session with a user id.
*
* @memberOf SessionService
* @api private
*/
bind(sid: SID, uid: UID, cb: (err: Error | null, result ?: void) => void) {
let session = this.sessions[sid];
if (!session) {
process.nextTick(function () {
cb(new Error('session does not exist, sid: ' + sid));
});
return;
}
if (session.uid) {
if (session.uid === uid) {
// already bound with the same uid
cb(null);
return;
}
// already bound with other uid
process.nextTick(function () {
cb(new Error('session has already bound with ' + session.uid));
});
return;
}
let sessions = this.uidMap[uid];
if (!!this.singleSession && !!sessions) {
process.nextTick(function () {
cb(new Error('singleSession is enabled, and session has already bound with uid: ' + uid));
});
return;
}
if (!sessions) {
sessions = this.uidMap[uid] = [];
}
for (let i = 0, l = sessions.length; i < l; i++) {
// session has binded with the uid
if (sessions[i].id === session.id) {
process.nextTick(cb);
return;
}
}
sessions.push(session);
session.bind(uid);
if (cb) {
process.nextTick(cb);
}
}
/**
* Unbind a session with the user id.
*
* @memberOf SessionService
* @api private
*/
unbind(sid: SID, uid: UID, cb: (err ?: Error, result ?: void) => void) {
let session = this.sessions[sid];
if (!session) {
process.nextTick(function () {
cb(new Error('session does not exist, sid: ' + sid));
});
return;
}
if (!session.uid || session.uid !== uid) {
process.nextTick(function () {
cb(new Error('session has not bind with ' + session.uid));
});
return;
}
let sessions = this.uidMap[uid], sess;
if (sessions) {
for (let i = 0, l = sessions.length; i < l; i++) {
sess = sessions[i];
if (sess.id === sid) {
sessions.splice(i, 1);
break;
}
}
if (sessions.length === 0) {
delete this.uidMap[uid];
}
}
session.unbind(uid);
if (cb) {
process.nextTick(cb);
}
}
/**
* Get session by id.
*
* @param {Number} id The session id
* @return {Session}
*
* @memberOf SessionService
* @api private
*/
get(sid: SID) {
return this.sessions[sid];
}
/**
* Get sessions by userId.
*
* @param {Number} uid User id associated with the session
* @return {Array} list of session binded with the uid
*
* @memberOf SessionService
* @api private
*/
getByUid(uid: UID) {
return this.uidMap[uid];
}
/**
* Remove session by key.
*
* @param {Number} sid The session id
*
* @memberOf SessionService
* @api private
*/
remove(sid: SID) {
let session = this.sessions[sid];
if (session) {
let uid = session.uid;
delete this.sessions[session.id];
let sessions = this.uidMap[uid];
if (!sessions) {
return;
}
for (let i = 0, l = sessions.length; i < l; i++) {
if (sessions[i].id === sid) {
sessions.splice(i, 1);
if (sessions.length === 0) {
delete this.uidMap[uid];
}
break;
}
}
}
}
/**
* Import the key/value into session.
*
* @api private
*/
import(sid: SID, key: string, value: string, cb: (err ?: Error, result ?: void) => void) {
let session = this.sessions[sid];
if (!session) {
utils.invokeCallback(cb, new Error('session does not exist, sid: ' + sid));
return;
}
session.set(key, value);
utils.invokeCallback(cb);
}
/**
* Import new value for the existed session.
*
* @memberOf SessionService
* @api private
*/
importAll(sid: SID, settings: { [key: string]: any }, cb: (err ?: Error, result ?: void) => void) {
let session = this.sessions[sid];
if (!session) {
utils.invokeCallback(cb, new Error('session does not exist, sid: ' + sid));
return;
}
for (let f in settings) {
session.set(f, settings[f]);
}
utils.invokeCallback(cb);
}
/**
* Kick all the session offline under the user id.
*
* @param {Number} uid user id asscociated with the session
* @param {Function} cb callback function
*
* @memberOf SessionService
*/
kick(uid: UID, reason ?: string, cb ?: (err ?: Error, result ?: void) => void) {
// compatible for old kick(uid, cb);
if (typeof reason === 'function') {
cb = reason;
reason = 'kick';
}
let sessions = this.getByUid(uid);
if (sessions) {
// notify client
let sids: SID[] = [];
let self = this;
sessions.forEach(function (session) {
sids.push(session.id);
});
sids.forEach(function (sid) {
self.sessions[sid].closed(reason);
});
process.nextTick(function () {
utils.invokeCallback(cb);
});
} else {
process.nextTick(function () {
utils.invokeCallback(cb);
});
}
}
/**
* Kick a user offline by session id.
*
* @param {Number} sid session id
* @param {Function} cb callback function
*
* @memberOf SessionService
*/
kickBySessionId(sid: SID, reason ?: string, cb ?: (err ?: Error, result ?: void) => void) {
if (typeof reason === 'function') {
cb = reason;
reason = 'kick';
}
let session = this.get(sid);
if (session) {
// notify client
session.closed(reason);
process.nextTick(function () {
utils.invokeCallback(cb);
});
} else {
process.nextTick(function () {
utils.invokeCallback(cb);
});
}
}
/**
* Get client remote address by session id.
*
* @param {Number} sid session id
* @return {Object} remote address of client
*
* @memberOf SessionService
*/
getClientAddressBySessionId(sid: SID) {
let session = this.get(sid);
if (session) {
let socket = session.__socket__;
return socket.remoteAddress;
} else {
return null;
}
}
/**
* Send message to the client by session id.
*
* @param {String} sid session id
* @param {Object} msg message to send
*
* @memberOf SessionService
* @api private
*/
sendMessage(sid: SID, msg: any) {
let session = this.sessions[sid];
if (!session) {
logger.debug('Fail to send message for non-existing session, sid: ' + sid + ' msg: ' + msg);
return false;
}
return send(this, session, msg);
}
/**
* Send message to the client by user id.
*
* @param {String} uid userId
* @param {Object} msg message to send
*
* @memberOf SessionService
* @api private
*/
sendMessageByUid(uid: UID, msg: any) {
let sessions = this.uidMap[uid];
if (!sessions) {
logger.debug('fail to send message by uid for non-existing session. uid: %j',
uid);
return false;
}
for (let i = 0, l = sessions.length; i < l; i++) {
send(this, sessions[i], msg);
}
}
/**
* Iterate all the session in the session service.
*
* @param {Function} cb callback function to fetch session
* @api private
*/
forEachSession(cb: (session: Session) => void) {
for (let sid in this.sessions) {
cb(this.sessions[sid]);
}
}
/**
* Iterate all the binded session in the session service.
*
* @param {Function} cb callback function to fetch session
* @api private
*/
forEachBindedSession(cb: (session: Session) => void) {
let i, l, sessions;
for (let uid in this.uidMap) {
sessions = this.uidMap[uid];
for (i = 0, l = sessions.length; i < l; i++) {
cb(sessions[i]);
}
}
}
/**
* Get sessions' quantity in specified server.
*
*/
getSessionsCount() {
return Object.keys(this.sessions).length;
}
akick: (uid: UID, reason ?: string) => Promise = utils.promisify(this.kick.bind(this));
akickBySessionId: (sid: SID, reason ?: string) => Promise = utils.promisify(this.kickBySessionId.bind(this));
abind: (sid: SID, uid: UID) => Promise = utils.promisify(this.bind.bind(this));
aunbind: (sid: SID, uid: UID) => Promise = utils.promisify(this.unbind.bind(this));
aimport: (sid: SID, key: string, value: any) => Promise = utils.promisify(this.import.bind(this));
aimportAll: (sid: SID, settings: any) => Promise = utils.promisify(this.importAll.bind(this));
}
/**
* Send message to the client that associated with the session.
*
* @api private
*/
let send = function (service: SessionService, session: Session, msg: any) {
session.send(msg);
return true;
};
export interface ISession {
id: number;
uid: string;
frontendId: string;
settings: { [key: string]: any };
}
/**
* Session maintains the relationship between client connection and user information.
* There is a session associated with each client connection. And it should bind to a
* user id after the client passes the identification.
*
* Session is created in frontend server and should not be accessed in handler.
* There is a proxy class called BackendSession in backend servers and FrontendSession
* in frontend servers.
*/
export class Session extends EventEmitter implements ISession {
id: SID;
frontendId: FRONTENDID;
uid: UID;
settings: { [key: string]: any };
__socket__: ISocket;
private __sessionService__: SessionService;
private __state__: number;
constructor(sid: SID, frontendId: FRONTENDID, socket: ISocket, service: SessionService) {
super();
this.id = sid; // r
this.frontendId = frontendId; // r
this.uid = null; // r
this.settings = {};
// private
this.__socket__ = socket;
this.__sessionService__ = service;
this.__state__ = ST_INITED;
}
/*
* Export current session as frontend session.
*/
toFrontendSession() {
return new FrontendSession(this);
}
/**
* Bind the session with the the uid.
*
* @param {Number} uid User id
* @api public
*/
bind(uid: UID) {
this.uid = uid;
this.emit('bind', uid);
}
/**
* Unbind the session with the the uid.
*
* @param {Number} uid User id
* @api private
*/
unbind(uid: UID) {
this.uid = null;
this.emit('unbind', uid);
}
/**
* Set values (one or many) for the session.
*
* @param {String|Object} key session key
* @param {Object} value session value
* @api public
*/
set(values: { [key: string]: any }): void;
set(key: string, value: any): void;
set(keyOrValues: string | { [key: string]: any }, value ?: any) {
if (utils.isObject(keyOrValues)) {
let values = keyOrValues as { [key: string]: any };
for (let i in values) {
this.settings[i] = values[i];
}
} else {
this.settings[keyOrValues as string] = value;
}
}
/**
* Remove value from the session.
*
* @param {String} key session key
* @api public
*/
remove(key: string) {
delete this.settings[key];
}
/**
* Get value from the session.
*
* @param {String} key session key
* @return {Object} value associated with session key
* @api public
*/
get(key: string) {
return this.settings[key];
}
/**
* Send message to the session.
*
* @param {Object} msg final message sent to client
*/
send(msg: any) {
this.__socket__.send(msg);
}
/**
* Send message to the session in batch.
*
* @param {Array} msgs list of message
*/
sendBatch(msgs: any[]) {
this.__socket__.sendBatch(msgs);
}
/**
* Closed callback for the session which would disconnect client in next tick.
*
* @api public
*/
closed(reason: string) {
if (this.__state__ === ST_CLOSED) {
return;
}
logger.debug('session on [%s] is closed with session id: %s,uid:%s,reason:%j', this.frontendId, this.id, this.uid, reason);
this.__state__ = ST_CLOSED;
this.__sessionService__.remove(this.id);
this.emit('closed', this.toFrontendSession(), reason);
this.__socket__.emit('closing', reason);
let self = this;
// give a chance to send disconnect message to client
process.nextTick(function () {
self.__socket__.disconnect();
});
}
/**
* 是否在线
*/
get isOnline(): boolean {
return this.__state__ !== ST_CLOSED;
}
/**
* 获取客户端的地址
*/
get remoteAddress() {
return this.__socket__.remoteAddress;
}
}
/**
* Frontend session for frontend server.
*/
export class FrontendSession extends EventEmitter implements ISession {
id: number;
uid: string;
frontendId: string;
settings: { [key: string]: any; };
private __session__: Session;
private __sessionService__: SessionService;
constructor(session: Session) {
super();
clone(session, this, FRONTEND_SESSION_FIELDS);
// deep copy for settings
this.settings = dclone(session.settings);
this.__session__ = session;
}
bind(uid: UID, cb: (err ?: Error, result ?: void) => void) {
let self = this;
this.__sessionService__.bind(this.id, uid, function (err) {
if (!err) {
self.uid = uid;
}
utils.invokeCallback(cb, err);
});
}
unbind(uid: UID, cb: (err ?: Error, result ?: void) => void) {
let self = this;
this.__sessionService__.unbind(this.id, uid, function (err) {
if (!err) {
self.uid = null;
}
utils.invokeCallback(cb, err);
});
}
set(key: string, value: any) {
this.settings[key] = value;
}
get(key: string) {
return this.settings[key];
}
push(key: string, cb: (err ?: Error, result ?: void) => void) {
this.__sessionService__.import(this.id, key, this.get(key), cb);
}
pushAll(cb: (err ?: Error, result ?: void) => void) {
this.__sessionService__.importAll(this.id, this.settings, cb);
}
on(event: string | symbol, listener: (...args: any[]) => void): this {
this.__session__.on(event, listener);
return super.on(event, listener);
}
abind(uid: string, ) {
return new Promise((resolve, reject) => this.bind(uid, (err, ret) => err ? reject(err) : resolve(ret as any)));
}
aunbind(uid: string, ) {
return new Promise((resolve, reject) => this.unbind(uid, (err, ret) => err ? reject(err) : resolve(ret as any)));
}
apush(key: string) {
return new Promise((resolve, reject) => this.push(key, (err, ret) => err ? reject(err) : resolve(ret as any)));
}
apushAll() {
return new Promise((resolve, reject) => this.pushAll((err, ret) => err ? reject(err) : resolve(ret as any)));
}
// abind = utils.promisify(this.bind.bind(this));
// aunbind = utils.promisify(this.unbind.bind(this));
// apush = utils.promisify(this.push.bind(this));
// apushAll = utils.promisify(this.pushAll.bind(this));
/**
* Export the key/values for serialization.
*
* @api private
*/
export() {
let res = {};
clone(this, res, EXPORTED_SESSION_FIELDS);
return res;
}
/**
* 是否在线
*/
get isOnline(): boolean {
return this.__session__.isOnline;
}
/**
* 获取客户端的地址
*/
get remoteAddress() {
return this.__session__.remoteAddress;
}
}
let clone = function (src: any, dest: any, includes: any) {
let f;
for (let i = 0, l = includes.length; i < l; i++) {
f = includes[i];
dest[f] = src[f];
}
};
let dclone = function (src: any) {
let res: any = {};
for (let f in src) {
res[f] = src[f];
}
return res;
};