im-client.js

import EventEmitter from 'eventemitter3';
import d from 'debug';
import Conversation from './conversation';
import ConversationQuery from './conversation-query';
import {
  GenericCommand,
  SessionCommand,
  ConvCommand,
  AckCommand,
  JsonObjectMessage,
  ReadCommand,
  ReadTuple,
  CommandType,
  OpType,
} from '../proto/message';
import { ErrorCode } from './error';
import { tap, Expirable, Cache, keyRemap, union, difference, trim, internal, throttle } from './utils';
import { applyDecorators, applyDispatcher } from './plugin';
import runSignatureFactory from './signature-factory-runner';
import { MessageStatus } from './messages/message';
import { version as VERSION } from '../package.json';

const debug = d('LC:IMClient');

export default class IMClient extends EventEmitter {
  /**
   * 无法直接实例化,请使用 {@link Realtime#createIMClient} 创建新的 IMClient。
   *
   * @extends EventEmitter
   * @param  {String} [id] 客户端 id
   * @param  {Object} [options]
   * @param  {Function} [options.signatureFactory] open session 时的签名方法 // TODO need details
   * @param  {Function} [options.conversationSignatureFactory] 对话创建、增减成员操作时的签名方法
   */
  constructor(id, options = {}, connection, props) {
    if (!(id === undefined || typeof id === 'string')) {
      throw new TypeError(`Client id [${id}] is not a String`);
    }
    super();
    Object.assign(this, {
      /**
       * @var id {String} 客户端 id
       * @memberof IMClient#
       */
      id,
      _connection: connection,
      options,
    }, props);

    if (!this._messageParser) {
      throw new Error('IMClient must be initialized with a MessageParser');
    }
    this._conversationCache = new Cache(`client:${this.id}`);
    this._ackMessageBuffer = {};
    internal(this).lastPatchTime = Date.now();
    internal(this)._eventemitter = new EventEmitter();
    [
      'invited',
      'kicked',
      'membersjoined',
      'membersleft',
      'message',
      'unreadmessages',
      'unreadmessagescountupdate',
      'close',
      'conflict',
      'unhandledmessage',
      'reconnect',
      'reconnecterror',
    ].forEach(event => this.on(
      event,
      (...payload) => this._debug(`${event} event emitted. %O`, payload)
    ));
    // onIMClientCreate hook
    applyDecorators(this._plugins.onIMClientCreate, this);
  }

  _debug(...params) {
    debug(...params, `[${this.id}]`);
  }

  /**
   * @override
   * @private
   */
  _dispatchCommand(command) {
    this._debug(trim(command), 'received');
    switch (command.cmd) {
      case CommandType.conv:
        return this._dispatchConvMessage(command);
      case CommandType.direct:
        return this._dispatchDirectMessage(command);
      case CommandType.session:
        return this._dispatchSessionMessage(command);
      case CommandType.unread:
        return this._dispatchUnreadMessage(command);
      case CommandType.rcp:
        return this._dispatchRcpMessage(command);
      case CommandType.patch:
        return this._dispatchPatchMessage(command);
      default:
        this.emit('unhandledmessage', command);
        return Promise.resolve();
    }
  }

  _dispatchSessionMessage(message) {
    const {
      sessionMessage: {
        code, reason,
      },
    } = message;
    switch (message.op) {
      case OpType.closed: {
        if (code === ErrorCode.SESSION_CONFLICT) {
          /**
           * 用户在其他客户端登录,当前客户端被服务端强行下线。详见文档「单点登录」章节。
           * @event IMClient#conflict
           */
          return this.emit('conflict', {
            reason,
          });
        }
        /**
         * 当前客户端被服务端强行下线
         * @event IMClient#close
         * @param {Object} payload
         * @param {Number} payload.code 错误码
         * @param {String} payload.reason 原因
         */
        return this.emit('close', {
          code, reason,
        });
      }
      default:
        this.emit('unhandledmessage', message);
        return Promise.reject(new Error('Unrecognized session command'));
    }
  }

  _dispatchUnreadMessage({
    unreadMessage: {
      convs,
      notifTime,
    },
  }) {
    internal(this).lastUnreadNotifTime = notifTime;
    // ensure all converstions are cached
    return this.getConversations(convs.map(conv => conv.cid)).then(() =>
      // update conversations data
      Promise.all(convs.map(({
          cid,
          unread,
          mid,
          timestamp: ts,
          from,
          data,
          patchTimestamp,
        }) => this.getConversation(cid).then((conversation) => {
          // deleted conversation
          if (!conversation) return null;
          let timestamp;
          if (ts) {
            timestamp = new Date(ts.toNumber());
            conversation.lastMessageAt = timestamp; // eslint-disable-line no-param-reassign
          }
          return (mid ? this._messageParser.parse(data).then((message) => {
            const messageProps = {
              id: mid,
              cid,
              timestamp,
              from,
            };
            if (patchTimestamp) {
              messageProps.updatedAt = new Date(patchTimestamp.toNumber());
            }
            Object.assign(message, messageProps);
            conversation.lastMessage = message; // eslint-disable-line no-param-reassign
          }) : Promise.resolve()).then(() => {
            const countNotUpdated = unread === internal(conversation).unreadMessagesCount;
            if (countNotUpdated) return null; // to be filtered
            // manipulate internal property directly to skip unreadmessagescountupdate event
            internal(conversation).unreadMessagesCount = unread;
            /**
             * 未读消息数目更新
             * @event IMClient#unreadmessages
             * @deprecated 请使用新的未读消息数目批量更新事件 {@link IMClient#event:unreadmessagescountupdate}
             * @param {Object} payload
             * @param {Number} payload.count 未读消息数
             * @param {String} [payload.lastMessageId] 最新一条未读消息 id
             * @param {Date} [payload.lastMessageTimestamp] 最新一条未读消息时间戳
             * @param {Conversation} conversation 未读消息数目有更新的对话
             */
            this.emit('unreadmessages', {
              count: unread,
              lastMessageId: mid,
              lastMessageTimestamp: timestamp,
            }, conversation);
            return conversation;
          });
        })
      // filter conversations without unread count update
      )).then(conversations => conversations.filter(conversation => conversation))
    ).then((conversations) => {
      if (conversations.length) {
        /**
         * 未读消息数目更新
         * @event IMClient#unreadmessagescountupdate
         * @since 3.4.0
         * @param {Conversation[]} conversations 未读消息数目有更新的对话列表
         */
        this.emit('unreadmessagescountupdate', conversations);
      }
    });
  }

  _dispatchRcpMessage(message) {
    const {
      rcpMessage,
      rcpMessage: {
        read,
      },
    } = message;
    const conversationId = rcpMessage.cid;
    const messageId = rcpMessage.id;
    const timestamp = new Date(rcpMessage.t.toNumber());
    const conversation = this._conversationCache.get(conversationId);
    // conversation not cached means the client does not send the message
    // during this session
    if (!conversation) return;
    conversation._handleReceipt({ messageId, timestamp, read });
  }

  _dispatchPatchMessage({
      patchMessage: {
        patches,
      },
    }) {
    // ensure all converstions are cached
    return this.getConversations(patches.map(patch => patch.cid)).then(() =>
      Promise.all(patches.map(({
        cid, mid, timestamp, recall, data, patchTimestamp, from,
      }) =>
        this.getConversation(cid).then((conversation) => {
          // deleted conversation
          if (!conversation) return null;
          return this._messageParser.parse(data).then((message) => {
            const patchTime = patchTimestamp.toNumber();
            const messageProps = {
              id: mid,
              cid,
              timestamp: new Date(timestamp.toNumber()),
              updatedAt: new Date(patchTime),
              from,
            };
            Object.assign(message, messageProps);
            message._setStatus(MessageStatus.SENT);
            if (internal(this).lastPatchTime < patchTime) {
              internal(this).lastPatchTime = patchTime;
            }
            // update conversation lastMessage
            if (conversation.lastMessage && conversation.lastMessage.id === mid) {
              conversation.lastMessage = message; // eslint-disable-line no-param-reassign
            }
            if (recall) {
              /**
               * 消息被撤回
               * @event IMClient#messagerecall
               * @param {AVMessage} message 被撤回的消息
               * @param {Conversation} conversation 消息所在的会话
               */
              this.emit('messagerecall', message, conversation);
              /**
               * 消息被撤回
               * @event Conversation#messagerecall
               * @param {AVMessage} message 被撤回的消息
               */
              conversation.emit('messagerecall', message);
            } else {
              /**
               * 消息被修改
               * @event IMClient#messageupdate
               * @param {AVMessage} message 被修改的消息
               * @param {Conversation} conversation 消息所在的会话
               */
              this.emit('messageupdate', message, conversation);
              /**
               * 消息被修改
               * @event Conversation#messageupdate
               * @param {AVMessage} message 被修改的消息
               */
              conversation.emit('messageupdate', message);
            }
          });
        })
      ))
    );
  }

  _dispatchConvMessage(message) {
    const {
      convMessage,
      convMessage: {
        initBy, m,
      },
    } = message;
    switch (message.op) {
      case OpType.joined: {
        return this.getConversation(convMessage.cid).then((conversation) => {
          if (!conversation.transient) {
            // eslint-disable-next-line no-param-reassign
            conversation.members = union(conversation.members, [this.id]);
          }
          const payload = {
            invitedBy: initBy,
          };
          /**
           * 当前用户被添加至某个对话
           * @event IMClient#invited
           * @param {Object} payload
           * @param {String} payload.invitedBy 邀请者 id
           * @param {Conversation} conversation
           */
          this.emit('invited', payload, conversation);
          /**
           * 当前用户被添加至当前对话
           * @event Conversation#invited
           * @param {Object} payload
           * @param {String} payload.invitedBy 该移除操作的发起者 id
           */
          conversation.emit('invited', payload);
        });
      }
      case OpType.left: {
        return this.getConversation(convMessage.cid).then((conversation) => {
          if (!conversation.transient) {
            // eslint-disable-next-line no-param-reassign
            conversation.members = difference(conversation.members, [this.id]);
          }
          const payload = {
            kickedBy: initBy,
          };
          /**
           * 当前用户被从某个对话中移除
           * @event IMClient#kicked
           * @param {Object} payload
           * @param {String} payload.kickedBy 该移除操作的发起者 id
           * @param {Conversation} conversation
           */
          this.emit('kicked', payload, conversation);
          /**
           * 当前用户被从当前对话中移除
           * @event Conversation#kicked
           * @param {Object} payload
           * @param {String} payload.kickedBy 该移除操作的发起者 id
           */
          conversation.emit('kicked', payload);
        });
      }
      case OpType.members_joined: {
        return this.getConversation(convMessage.cid).then((conversation) => {
          if (!conversation.transient) {
            // eslint-disable-next-line no-param-reassign
            conversation.members = union(conversation.members, convMessage.m);
          }
          const payload = {
            invitedBy: initBy,
            members: m,
          };
          /**
           * 有用户被添加至某个对话
           * @event IMClient#membersjoined
           * @param {Object} payload
           * @param {String[]} payload.members 被添加的用户 id 列表
           * @param {String} payload.invitedBy 邀请者 id
           * @param {Conversation} conversation
           */
          this.emit('membersjoined', payload, conversation);
          /**
           * 有成员被添加至当前对话
           * @event Conversation#membersjoined
           * @param {Object} payload
           * @param {String[]} payload.members 被添加的成员 id 列表
           * @param {String} payload.invitedBy 邀请者 id
           */
          conversation.emit('membersjoined', payload);
        });
      }
      case OpType.members_left: {
        return this.getConversation(convMessage.cid).then((conversation) => {
          if (!conversation.transient) {
            // eslint-disable-next-line no-param-reassign
            conversation.members = difference(conversation.members, convMessage.m);
          }
          const payload = {
            kickedBy: initBy,
            members: m,
          };
          /**
           * 有成员被从某个对话中移除
           * @event IMClient#membersleft
           * @param {Object} payload
           * @param {String[]} payload.members 被移除的成员 id 列表
           * @param {String} payload.kickedBy 该移除操作的发起者 id
           * @param {Conversation} conversation
           */
          this.emit('membersleft', payload, conversation);
          /**
           * 有成员被从当前对话中移除
           * @event Conversation#membersleft
           * @param {Object} payload
           * @param {String[]} payload.members 被移除的成员 id 列表
           * @param {String} payload.kickedBy 该移除操作的发起者 id
           */
          conversation.emit('membersleft', payload);
        });
      }
      default:
        this.emit('unhandledmessage', message);
        return Promise.reject(new Error('Unrecognized conversation command'));
    }
  }

  _dispatchDirectMessage(originalMessage) {
    const {
      directMessage,
      directMessage: {
        id, cid, fromPeerId, timestamp, transient, patchTimestamp,
      },
    } = originalMessage;
    return Promise.all([
      this.getConversation(directMessage.cid),
      this._messageParser.parse(directMessage.msg),
    ]).then(([conversation, message]) => {
      // deleted conversation
      if (!conversation) return undefined;
      const messageProps = {
        id,
        cid,
        timestamp: new Date(timestamp.toNumber()),
        from: fromPeerId,
        transient,
      };
      if (patchTimestamp) {
        messageProps.updatedAt = new Date(patchTimestamp.toNumber());
      }
      Object.assign(message, messageProps);
      message._setStatus(MessageStatus.SENT);
      return this._dispatchParsedMessage(message, conversation);
    });
  }

  _dispatchParsedMessage(message, conversation) {
    // beforeMessageDispatch hook
    return applyDispatcher(this._plugins.beforeMessageDispatch, [message, conversation])
      .then((shouldDispatch) => {
        if (shouldDispatch === false) return;
        conversation.lastMessage = message; // eslint-disable-line no-param-reassign
        conversation.lastMessageAt = message.timestamp; // eslint-disable-line no-param-reassign
        // filter outgoing message sent from another device
        if (message.from !== this.id) {
          conversation.unreadMessagesCount += 1; // eslint-disable-line no-param-reassign
          if (!(message.transient || conversation.transient)) {
            this._sendAck(message);
          }
        }
        /**
         * 当前用户收到消息
         * @event IMClient#message
         * @param {Message} message
         * @param {Conversation} conversation 收到消息的对话
         */
        this.emit('message', message, conversation);
        /**
         * 当前对话收到消息
         * @event Conversation#message
         * @param {Message} message
         */
        conversation.emit('message', message);
      });
  }

  _sendAck(message) {
    this._debug('send ack for %O', message);
    const { cid } = message;
    if (!cid) {
      throw new Error('missing cid');
    }
    if (!this._ackMessageBuffer[cid]) {
      this._ackMessageBuffer[cid] = [];
    }
    this._ackMessageBuffer[cid].push(message);
    return this._doSendAck();
  }

  // jsdoc-ignore-start
  @throttle(1000)
  // jsdoc-ignore-end
  _doSendAck() {
    // if not connected, just skip everything
    if (!this._connection.is('connected')) return;
    this._debug('do send ack %O', this._ackMessageBuffer);
    Promise.all(Object.keys(this._ackMessageBuffer).map((cid) => {
      const convAckMessages = this._ackMessageBuffer[cid];
      const timestamps = convAckMessages.map(message => message.timestamp);
      const command = new GenericCommand({
        cmd: 'ack',
        ackMessage: new AckCommand({
          cid,
          fromts: Math.min.apply(null, timestamps),
          tots: Math.max.apply(null, timestamps),
        }),
      });
      delete this._ackMessageBuffer[cid];
      return this._send(command, false).catch((error) => {
        this._debug('send ack failed: %O', error);
        this._ackMessageBuffer[cid] = convAckMessages;
      });
    }));
  }

  _send(cmd, ...args) {
    const command = cmd;
    if (this.id) {
      command.peerId = this.id;
    }
    return this._connection.send(command, ...args);
  }

  _open(appId, tag, deviceId, isReconnect = false) {
    this._debug('open session');
    const {
      lastUnreadNotifTime,
      lastPatchTime,
    } = internal(this);
    return Promise
      .resolve(new GenericCommand({
        cmd: 'session',
        op: 'open',
        appId,
        sessionMessage: new SessionCommand({
          ua: `js/${VERSION}`,
          r: isReconnect,
          lastUnreadNotifTime,
          lastPatchTime,
          configBitmap: 1,
        }),
      }))
      .then((command) => {
        if (isReconnect) {
          // if sessionToken is not expired, skip signature/tag/deviceId
          const sessionToken = internal(this).sessionToken;
          if (sessionToken) {
            const value = sessionToken.value;
            if (value && value !== Expirable.EXPIRED) {
              Object.assign(command.sessionMessage, {
                st: value,
              });
              return command;
            }
          }
        }
        Object.assign(command.sessionMessage, trim({
          tag,
          deviceId,
        }));
        if (this.options.signatureFactory) {
          return runSignatureFactory(this.options.signatureFactory, [this.id])
            .then((signatureResult) => {
              Object.assign(command.sessionMessage, keyRemap({
                signature: 's',
                timestamp: 't',
                nonce: 'n',
              }, signatureResult));
              return command;
            });
        }
        return command;
      })
      .then(this._send.bind(this))
      .then((resCommand) => {
        const {
          peerId,
          sessionMessage: {
            st: token,
            stTtl: tokenTTL,
          },
        } = resCommand;
        if (!peerId) {
          console.warn('Unexpected session opened without peerId.');
          return;
        }
        this.id = peerId;
        if (token) {
          internal(this).sessionToken = new Expirable(token, tokenTTL * 1000);
        }
      }, (error) => {
        if (error.code === ErrorCode.SESSION_TOKEN_EXPIRED) {
          if (internal(this).sessionToken === undefined) {
            // let it fail if sessoinToken not cached but command rejected as token expired
            // to prevent session openning flood
            throw new Error('Unexpected session expiration');
          }
          debug('Session token expired, reopening');
          delete internal(this).sessionToken;
          return this._open(appId, tag, deviceId, isReconnect);
        }
        throw error;
      });
  }

  /**
   * 关闭客户端
   * @return {Promise}
   */
  close() {
    this._debug('close session');
    const command = new GenericCommand({
      cmd: 'session',
      op: 'close',
    });
    return this._send(command).then(
      () => {
        internal(this)._eventemitter.emit('close', {
          code: 0,
        });
        this.emit('close', {
          code: 0,
        });
      }
    );
  }
  /**
   * 获取 client 列表中在线的 client,每次查询最多 20 个 clientId,超出部分会被忽略
   * @param  {String[]} clientIds 要查询的 client ids
   * @return {Primse.<String[]>} 在线的 client ids
   */
  ping(clientIds) {
    this._debug('ping');
    if (!(clientIds instanceof Array)) {
      throw new TypeError(`clientIds ${clientIds} is not an Array`);
    }
    if (!clientIds.length) {
      return Promise.resolve([]);
    }
    const command = new GenericCommand({
      cmd: 'session',
      op: 'query',
      sessionMessage: new SessionCommand({
        sessionPeerIds: clientIds,
      }),
    });
    return this._send(command)
      .then(resCommand => resCommand.sessionMessage.onlineSessionPeerIds);
  }

  /**
   * 获取某个特定的对话
   * @param  {String} id 对话 id,对应 _Conversation 表中的 objectId
   * @param  {Boolean} [noCache=false] 强制不从缓存中获取
   * @return {Promise.<Conversation>} 如果 id 对应的对话不存在则返回 null
   */
  getConversation(id, noCache = false) {
    if (typeof id !== 'string') {
      throw new TypeError(`${id} is not a String`);
    }
    if (!noCache) {
      const cachedConversation = this._conversationCache.get(id);
      if (cachedConversation) {
        return Promise.resolve(cachedConversation);
      }
    }
    return this
      .getQuery()
      .equalTo('objectId', id)
      .find()
      .then(conversations => conversations[0] || null);
  }

  /**
   * 通过 id 批量获取某个特定的对话
   * @since 3.4.0
   * @param  {String[]} ids 对话 id 列表,对应 _Conversation 表中的 objectId
   * @param  {Boolean} [noCache=false] 强制不从缓存中获取
   * @return {Promise.<Conversation[]>} 如果 id 对应的对话不存在则返回 null
   */
  getConversations(ids, noCache = false) {
    const remoteConversationIds =
      noCache ? ids : ids.filter(id => this._conversationCache.get(id) === null);
    return (
      remoteConversationIds.length ?
      this.getQuery().containedIn('objectId', remoteConversationIds).limit(999).find() :
      Promise.resolve()
    ).then(() => ids.map(id => this._conversationCache.get(id)));
  }

  /**
   * 构造一个 ConversationQuery 来查询对话
   * @return {ConversationQuery}
   */
  getQuery() {
    return new ConversationQuery(this);
  }

  _executeQuery(query) {
    const queryJSON = query.toJSON();
    queryJSON.where = new JsonObjectMessage({
      data: JSON.stringify(queryJSON.where),
    });
    const command = new GenericCommand({
      cmd: 'conv',
      op: 'query',
      convMessage: new ConvCommand(queryJSON),
    });
    return this
      ._send(command)
      .then((resCommand) => {
        try {
          return JSON.parse(resCommand.convMessage.results.data);
        } catch (error) {
          const commandString = JSON.stringify(trim(resCommand));
          throw new Error(`Parse query result failed: ${error.message}. Command: ${commandString}`);
        }
      })
      .then(conversations => Promise.all(conversations.map(
        this._parseConversationFromRawData.bind(this)
      )))
      .then(conversations => conversations.map((fetchedConversation) => {
        let conversation = this._conversationCache.get(fetchedConversation.id);
        if (!conversation) {
          conversation = fetchedConversation;
          this._debug('no match, set cache');
          this._conversationCache.set(fetchedConversation.id, fetchedConversation);
        } else {
          this._debug('update cached conversation');
          [
            'creator',
            'createdAt',
            'updatedAt',
            'lastMessageAt',
            'lastMessage',
            'mutedMembers',
            'members',
            '_attributes',
            'transient',
            'muted',
          ].forEach((key) => {
            const value = fetchedConversation[key];
            if (value !== undefined) conversation[key] = value;
          });
          conversation._reset();
        }
        return conversation;
      }));
  }

  _parseConversationFromRawData(rawData) {
    const data = keyRemap({
      objectId: 'id',
      lm: 'lastMessageAt',
      msg: 'lastMessage',
      msg_from: 'lastMessageFrom',
      msg_mid: 'lastMessageId',
      msg_timestamp: 'lastMessageTimestamp',
      patch_timestamp: 'lastMessagePatchTimestamp',
      m: 'members',
      tr: 'transient',
      sys: 'system',
      c: 'creator',
      mu: 'mutedMembers',
    }, rawData);
    return Promise.resolve().then(() => {
      if (data.lastMessage) {
        return this._messageParser.parse(data.lastMessage).then(
          (message) => {
            /* eslint-disable no-param-reassign */
            data.lastMessage = message;
            message.from = data.lastMessageFrom;
            message.id = data.lastMessageId;
            message.timestamp = new Date(data.lastMessageTimestamp);
            if (data.lastMessagePatchTimestamp) {
              message.updatedAt = new Date(data.lastMessagePatchTimestamp);
            }
            message._setStatus(MessageStatus.SENT);
            delete data.lastMessageFrom;
            delete data.lastMessageId;
            delete data.lastMessageTimestamp;
            delete data.lastMessagePatchTimestamp;
            /* eslint-enable no-param-reassign */
          }
        );
      }
      return Promise.resolve();
    }).then(() => new Conversation(data, this));
  }

  /**
   * 创建一个 conversation
   * @param {Object} options 除了下列字段外的其他字段将被视为对话的自定义属性
   * @param {String[]} options.members 对话的初始成员列表,默认包含当前 client
   * @param {String} [options.name] 对话的名字
   * @param {Object} [options.attributes] DEPRECATED: 额外属性,对应 _Conversation 表的 attr 列
   * @param {Boolean} [options.transient=false] 暂态会话
   * @param {Boolean} [options.unique=false] 唯一对话,当其为 true 时,如果当前已经有相同成员的对话存在则返回该对话,否则会创建新的对话
   * @return {Promise.<Conversation>}
   */
  createConversation(options = {}) {
    const {
      members: m,
      name,
      attributes,
      transient,
      unique,
      ...properties
    } = options;
    if (!(transient || Array.isArray(m))) {
      throw new TypeError(`conversation members ${m} is not an array`);
    }
    let members = new Set(m);
    members.add(this.id);
    members = Array.from(members).sort();
    let attr = properties || {};
    if (name) {
      if (typeof name !== 'string') {
        throw new TypeError(`conversation name ${name} is not a string`);
      }
      attr.name = name;
    }
    if (attributes) {
      console.warn('DEPRECATION createConversation options.attributes param: Use options[propertyName] instead. See https://url.leanapp.cn/DeprecateAttributes for more details.');
      attr.attr = attributes;
    }
    attr = new JsonObjectMessage({
      data: JSON.stringify(attr),
    });

    const startCommandJson = {
      m: members,
      attr,
      transient,
      unique,
    };

    return Promise.resolve(
        new GenericCommand({
          cmd: 'conv',
          op: 'start',
          convMessage: new ConvCommand(startCommandJson),
        })
      )
      .then((command) => {
        if (this.options.conversationSignatureFactory) {
          const params = [null, this.id, members, 'create'];
          return runSignatureFactory(this.options.conversationSignatureFactory, params)
            .then((signatureResult) => {
              Object.assign(command.convMessage, keyRemap({
                signature: 's',
                timestamp: 't',
                nonce: 'n',
              }, signatureResult));
              return command;
            });
        }
        return command;
      })
      .then(this._send.bind(this))
      .then(resCommand => new Conversation({
        name,
        attr: attributes,
        transient,
        unique,
        id: resCommand.convMessage.cid,
        createdAt: resCommand.convMessage.cdate,
        updatedAt: resCommand.convMessage.cdate,
        lastMessageAt: null,
        creator: this.id,
        members: transient ? [] : members,
        ...properties,
      }, this))
      .then(tap(conversation =>
        this._conversationCache.set(conversation.id, conversation)
      ));
  }

  /**
   * 将指定的所有会话标记为已读
   * @deprecated 请遍历调用 conversations 的 {@link Conversation#read read} 方法
   * @param {Conversation[]} conversations 指定的会话列表
   * @return {Promise.<Conversation[]>} conversations 返回输入的会话列表
   */
  // eslint-disable-next-line class-methods-use-this
  markAllAsRead(conversations) {
    console.warn('DEPRECATION IMClient.markAllAsRead: Use Conversation#read instead.');
    if (!Array.isArray(conversations)) {
      throw new TypeError(`${conversations} is not an Array`);
    }
    return Promise.all(conversations.map(conversation => conversation.read()));
  }

  // jsdoc-ignore-start
  @throttle(1000)
  // jsdoc-ignore-end
  _doSendRead() {
    // if not connected, just skip everything
    if (!this._connection.is('connected')) return;
    const buffer = internal(this).readConversationsBuffer;
    const conversations = Array.from(buffer);
    if (!conversations.length) return;
    const ids = conversations.map((conversation) => {
      if (!(conversation instanceof Conversation)) {
        throw new TypeError(`${conversation} is not a Conversation`);
      }
      return conversation.id;
    });
    this._debug(`mark [${ids}] as read`);
    buffer.clear();
    this._sendReadCommand(conversations).catch((error) => {
      this._debug('send read failed: %O', error);
      conversations.forEach(buffer.add.bind(buffer));
    });
  }
  _sendReadCommand(conversations) {
    return this._send(new GenericCommand({
      cmd: 'read',
      readMessage: new ReadCommand({
        convs: conversations.map(conversation => new ReadTuple({
          cid: conversation.id,
          mid: (conversation.lastMessage && conversation.lastMessage.from !== this.id)
            ? conversation.lastMessage.id : undefined,
          timestamp: (conversation.lastMessageAt || new Date()).getTime(),
        })),
      }),
    }), false);
  }
}