import EventBus from 'vertx3-eventbus-client'; import { ChatApi, ChatNewAttachment, ChatAttachment, ChatNewInfo, ChatInfo, ChatNewMessageTypeEnum, ChatInfoList, ChatMessageList, ChatNewMessage, ChatMessage, ChatAckStatusEnum } from '@apsel/apsel-api-ts-axios'; import { AxiosInstance } from 'axios'; import global_axios from 'axios'; import { MessageList, Message, Attachment } from './type'; import { ChatUserInfo } from '@apsel/apsel-api-ts-axios'; export interface ChatEngineConfig { store: any; axios?: AxiosInstance; busURI?: string; busPingInterval?: number; } export interface MessageListDict { [details: string]: MessageList; } export default class ChatEngine { private config: ChatEngineConfig = { store: {}, axios: global_axios, busURI: 'http://localhost:9991/events', busPingInterval: 10000, }; private eventBus = undefined; /** Data */ private chatList: ChatInfoList = { chats: [] }; private chatMessageList: MessageListDict = {}; /** * Instanciate with appropriate config */ constructor(config: ChatEngineConfig | undefined) { if (config) { this.config = { ...this.config, ...config}; } } public clear() { try { console.log('[ChatEngine] Clear'); this.chatList.chats.splice(0); for (const x in this.chatMessageList) { if (typeof(x) === 'string') { delete this.chatMessageList[x]; } } this.config.store.commit('totalUnreadMessage', 0); } catch (e) { console.error('[ChatEngine] clear error', e); } } get url() { return this.config.store.state.auth.url; } get userId() { return this.config.store.state.auth.userId; } get sessionId() { return this.config.store.state.auth.sessionId; } get accessToken() { return this.config.store.state.auth.accessToken; } get refreshToken() { return this.config.store.state.auth.refreshToken; } get chatApi() { return new ChatApi({ accessToken: this.accessToken, }, this.url, this.config.axios); } /** * Build the URl for an attachment using the path */ public urlForAttachment(att?: Attachment) { // RULE: New message don't have a 'path' (len 0) // but they may have an objectUrl if (att != null) { let url = null; if (att.objectUrl) { url = att.objectUrl; } else { url = this.url + att.path; } return url; } } /** * Begin listening for chats event */ public async start() { console.info('[ChatEngine] Start called'); // this.setState('Disconnected', 'Connectings'); if (this.eventBus) { console.error('[ChatEngine] Already started'); return; } const eventBus = new EventBus(this.config.busURI, { vertxbus_ping_interval: this.config.busPingInterval}); eventBus.enableReconnect(true); const _ = this; eventBus.onopen = () => { console.log('[ChatEngine] Event-bus Connected'); // this.setState('Connected', 'Running'); // Try to register ont he event queue eventBus.registerHandler('chat/user/' + _.userId, { accessToken: _.accessToken }, (error: any, message: any) => { console.log('[ChatEngine] New Event', JSON.stringify(message)); if (message.body && message.body.source === 'chat') { if (message.body.event === 'new_message') { this.busHandler_NewChatMessage(message.body.data); this.updateUnreadCount(); } else if (message.body.event === 'new_chat') { this.busHandler_NewChat(message.body.data); this.updateUnreadCount(); } else if (message.body.event === 'update_message') { this.busHandler_UpdateChatMessage(message.body.data); this.updateUnreadCount(); } } }); }; // Reconnect handler eventBus.onreconnect = () => { console.log('[ChatEngine] Re-Connected'); // this.setState('Reconnected', ''); }; // handle close eventBus.onclose = (param: any) => { const reason = (param && 'reason' in param) ? param.reason : 'Unknown'; // this.setState('Disconnected', reason); console.log('[ChatEngine] Disconnected: ' + reason); }; this.eventBus = eventBus; } public async busHandler_NewChat(new_chat: any) { /* New Event { "type":"rec", "address":"chat/user/7e0ade97-37af-43bc-98e9-577891b2f33d", "body":{ "source":"chat", "event":"new_chat", "data":{ "chatId":"40def5e1-2a11-408d-b4a6-69fbe393d861", "from":"8bedaa5a-3db9-4fcc-825a-2c3156b0dcfb", "to":"7e0ade97-37af-43bc-98e9-577891b2f33d" } } } */ const chat = await this.getChat(new_chat.chatId); console.log('[ChatEngine] Chat loaded from service bus event', chat); } public async busHandler_UpdateChatMessage(message: any) { /* {"type":"rec","address":"chat/user/ba1e5194-d919-48a0-a50f-d9128ad476f2","body":{"source":"chat","event":"update_message","data":{"chatId":"11eadb45-9259-48e5-9118-1fb46554d797","id":"1723a6f8-bf86-435d-8482-4d4a90eb9bf0","type":"text","status":"received","timestamp":"2019-05-30T07:52:53","from":"ba1e5194-d919-48a0-a50f-d9128ad476f2","to":"50911b98-2525-453b-91f9-4c6632e18b92","content":"df","attachment":null,"replyTo":null,"actions":null,"enabled":true}}} */ // Create a new Message const m = new Message( message.id, message.enabled, /* isEnabled */ message.type, message.status, message.timestamp, message.content, message.attachment, message.from, message.to, message.replyTo, message.actions, ); if (this.chatMessageList[message.chatId]) { console.info('[ChatEngine] Adding message received by event bus', m); this.chatMessageList[message.chatId].updateMessage(m); } } public async busHandler_NewChatMessage(new_message: any) { /* New Event {"type":"rec", "address":"chat/user/7e0ade97-37af-43bc-98e9-577891b2f33d", "body":{ "source":"chat", "event":"new_message", "data":{ "chatId":"d41cf15d-b36f-49de-afcd-df3f6ee3b122", "id":"6a134676-bd69-4469-9502-dcf85258506d", "type":"text", "status":"", "timestamp":"2019-05-30T04:34:19", "from":"8bedaa5a-3db9-4fcc-825a-2c3156b0dcfb", "to":"7e0ade97-37af-43bc-98e9-577891b2f33d", "content":"sdfasd", "attachment":null, "replyTo":null, "actions":null, "enabled":true} }} */ // Create a new Message const m = new Message( new_message.id, new_message.enabled, /* isEnabled */ new_message.type, new_message.status, new_message.timestamp, new_message.content, new_message.attachment, new_message.from, new_message.to, new_message.replyTo, new_message.actions, ); if (this.chatMessageList[new_message.chatId]) { console.info('[ChatEngine] Adding message received by event bus', m); this.chatMessageList[new_message.chatId].addMessage(m); } // RULE: If that message is to the current user and it was not yet acknowledged // acknowledge that the message was received if (new_message.to === this.userId && new_message.status !== ChatAckStatusEnum.Received && new_message.status !== ChatAckStatusEnum.Read ) { console.info('[ChatEngine] Ack message=received', m); this.chatApi.acknowledgeMessage(new_message.chatId, new_message.id, { status: ChatAckStatusEnum.Received }); } // If we don't have the messages we reload the chat to get a proper // unread message count if (!this.chatMessageList[new_message.chatId]) { await this.getChat(new_message.chatId, true); } } /** * Stop listenning for chat event and close the socket */ public stop() { console.info('[ChatEngine] Stop called'); let eventBus = this.eventBus; if (eventBus) { // @ts-ignore eventBus.close(); } eventBus = undefined; } /** * Create a new chat with another person */ public async createChat(to: string, meta?: string): Promise { // const chat = (await this.chatApi.createChat({ to, meta, })).data; // save the chat in the list // check if it's not already there const i = this.chatList.chats.findIndex( c => c.id == chat.id ) if (i < 0 ) this.chatList.chats.push(chat); else this.chatList.chats[i] = chat return chat; } /** * Create a new chat with another person */ public async createProductChat(productId: string): Promise { // const chat = (await this.chatApi.createChatForProduct(productId)).data; // save the chat in the list // check if it's not already there const i = this.chatList.chats.findIndex( c => c.id == chat.id ) if (i < 0 ) this.chatList.chats.push(chat); else this.chatList.chats[i] = chat return chat; } /** * Load the list of chat */ public async loadChatList(): Promise { try { let chatList = (await this.chatApi.listChat(this.userId)).data; // Sort the chat list by last message or if no message by date created chatList = chatList.chats.sort( (a, b) => { const adate = a.lastMessageDate || a.startDate; const bdate = b.lastMessageDate || b.startDate; if (adate < bdate) { return 1; } else { return -1; } }); // remove everything from the current array while (this.chatList.chats.length > 0) { this.chatList.chats.pop(); } // Add everything we received console.info('[ChatEngine] List of chat loaded', chatList); this.chatList.chats.push(...chatList); // Update the count this.updateUnreadCount(); return this.chatList; } catch (e) { console.error('[ChatEngine] Unable to load chat list for "' + this.userId + '"', e); } return undefined; } // Returns true is the message is sent by the current user public isMine(obj: { from: string | ChatUserInfo }): boolean { if (typeof(obj.from) === 'object') { return (obj.from.id === this.userId); } return (obj.from === this.userId); } public updateUnreadCount(): any { console.log('[ChatEngine] Updating unread count'); let totalUnreadCount = 0; // Iterate over all the chats this.chatList.chats.forEach( (chat) => { console.log(`[ChatEngine] update count. party1UnreadCount: ${chat.party1UnreadCount}, party2UnreadCount:${chat.party2UnreadCount}`) // Get how many message are unread as per the chat object let unreadCount = this.isMine(chat) ? chat.party1UnreadCount : chat.party2UnreadCount; //let unreadCountMessage = -1; // Check if the message list has been loaded // if yes, count the number of messages that are marked as unread console.log('[ChatEngine] update count. ChatMessageList', this.chatMessageList); if (chat.id in this.chatMessageList) { console.log(`[ChatEngine] update count. Message List found for chat ${chat.id}`); const messages = this.chatMessageList[chat.id]; unreadCount = 0; for (const message of messages) { // console.log(`[ChatEngine] update count. chat: ${chat.id}`, chat); if (!this.isMine(message) && message.status !== ChatAckStatusEnum.Read) { unreadCount++; } } // new_message.status !== ChatAckStatusEnum.Read } totalUnreadCount += unreadCount; }); console.log(`[ChatEngine] update count. totalUnreadCount:${totalUnreadCount}`); // Update the store try { this.config.store.commit('totalUnreadMessage', totalUnreadCount); } catch (e) { console.error('Missing mutation `totalUnreadMessage`'); } } /** * Get the list of chat instance */ public getChatList(): ChatInfoList { // Return the list immediately and load it async setTimeout( () => { this.loadChatList(); }, 1); return this.chatList; } /** * Get a chat */ public async getChat(chatId: string, reload?: boolean): Promise { console.info(`[ChatEngine] getChat ${chatId}, reload: ${reload}`); // search the cache try { let chat = null; const chatIndex = this.chatList.chats.findIndex( (c: ChatInfo) => c.id === chatId ); if (chatIndex < 0 || reload) { console.info('[ChatEngine] loading chat "' + chatId + '"'); chat = (await this.chatApi.getChat(chatId)).data; // Replace or Add the chat to our list if (chatIndex >= 0) { this.chatList.chats[chatIndex] = chat; } else { this.chatList.chats.push(chat); } console.info('[ChatEngine] Chat loaded "' + chatId + '"'); } else { chat = this.chatList.chats[chatIndex]; } console.info(`[ChatEngine] getChat result ${chatId}:`, chat); return chat; } catch (e) { console.error('[ChatEngine] chat not found "' + chatId + '"', e); } return undefined; } public async updateMessageList(chatId: string) { try { if (chatId in this.chatMessageList) { const messageList = this.chatMessageList[chatId]; const result = (await this.chatApi.listMessage(chatId)).data; // @ts-ignore messageList.addMessages(result.messages); // we need to update the chat list } } catch (e) { console.error('[ChatEngine] failed to update chat message list"' + chatId + '"', e); } } /** * Get a chat */ public async getMessageList(chatId: string): Promise { // Search the cache try { let messageList: MessageList; if (chatId in this.chatMessageList) { messageList = this.chatMessageList[chatId]; // we need to update the chat list; let it run async this.updateMessageList(chatId); } else { // Load the list from the backend const result = (await this.chatApi.listMessage(chatId)).data; console.log('[ChatEngine] loaded message list', result); // @ts-ignore§ messageList = new MessageList(result.messages); } // Store the messages if (messageList) { this.chatMessageList[chatId] = messageList; } return messageList; } catch (e) { console.error('[ChatEngine] failed to load chat message list"' + chatId + '"', e); } return undefined; } /** * Send a message */ public async sendMessage(chatId: string, chatNewMessage: ChatNewMessage): Promise { try { const messageList = await this.getMessageList(chatId); if (!messageList) { throw new Error('unable to load message list'); } // We have two ways to post a message: with or without attachement for not other // reason that we couldn't figure out how to set up the openapi description in // a way that is undertood by all code generators heck.. if (chatNewMessage.attachment && chatNewMessage.attachment.length > 0) { const attachment = chatNewMessage.attachment[0]; // @ts-ignore-line if (!attachment.blob!) { throw new Error('[ChatEngine] The attachment does not contain a blob, cannot save'); } console.info('[ChatEngine] Saving message with attachment'); const formData = new FormData(); formData.append('message', JSON.stringify(chatNewMessage)); // @ts-ignore-line formData.append('file', attachment.blob); const message = (await this.config.axios!.post( this.url + '/chats/' + chatId + '/messagesfile/', formData, { headers: { 'Content-Type': 'multipart/form-data', 'Authorization': 'Bearer ' + this.accessToken, }, }, )).data; messageList.addMessage(message); return message; } else { console.info('[ChatEngine] Saving message without attachment'); const message = (await this.chatApi.createMessage(chatId, chatNewMessage)).data; messageList.addMessage(message as Message); return message; } } catch (e) { console.error('[ChatEngine] failed to send message "' + chatId + '"', e); } } /** Mark a message as read if it's for the current user */ public async markAsRead(chatId: string, message: ChatMessage): Promise { if ( // Check if that message is for the current user // since he will be the only one allowed to change the status // we avoid spamming the server with useless messages message.to === this.userId && // Only update if the status is not already "Read" // also to minimize server spamming message.status !== ChatAckStatusEnum.Read) { const r = (await this.chatApi.acknowledgeMessage(chatId, message.id, {status: ChatAckStatusEnum.Read})).data; return r.success || false; } return false; } }