一套高可用实时消息系统实现


written by Alex Stocks on 2017/12/31,版权所有,无授权不得转载

0 实时消息系统与消息队列系统


实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:

二者之间的本质差异,决定了即时通信系统的技术体系迥异于消息队列系统。

1 极简实现


所谓群聊系统消息,就是一种群聊方式,譬如直播房间内的聊天对应的服务器端就是一个群聊消息系统。

2017年9月初初步实现了一套极简的消息系统,其大致架构如下:

系统名词解释:

1 Client : 消息发布者【其位于服务端内,亦可叫做服务端群聊消息系统调用者,不要与APP Client混淆】,publisher;

2 Proxy : 系统代理,对外统一接口,收集 Client 发来的消息转发给 Broker;

3 Broker :系统消息转发Server,Broker 会根据 Gateway Message 组织一个 RoomGatewayList【key为 RoomID,value为 Gateway IP:Port 地址列表】,然后把 Proxy 发来的消息转发到 Room 中所有成员登录的所有 Gateway;

4 Router :用户登录消息转发者,把 Gateway 转发来的用户登入登出消息转发给所有的 Broker;

5 Gateway :所有服务端的入口,接收合法客户端的连接,并把客户端的登录登出消息通过 Router 转发给所有的 Broker;

6 Room Message : Room 聊天消息;

7 Gateway Message : Room 内某成员 登录 或者 登出 某Gateway消息,包含用户 UIN/RoomID/Gateway 地址 {IP:Port} 等消息;

当一个 Room 中多个用户的 APP 连接一个 Gateway 的时候,Broker只会根据 RoomID 把房间内的消息转发一次给这个Gateway,由Gateway再把消息复制多份分别发送给连接这个 Gateway 的 Room 中的所有用户的客户端。

这套系统有如下特点:

从以上特点,整个消息系统足够简单,不考虑扩缩容问题,当系统负载到达极限的时候,就重新再部署一套系统以应对后端client的消息压力。

这种处理方式本质是把系统的扩容能力甩锅给了后端 Client 以及前端 Gateway:每次扩容一个系统,所有 Client 需要在本地配置文件中添加一个 Proxy 地址然后全部重启,所有 Gateway 则需要再本地配置文件添加一个 Router 地址然后全部重启。

这种“幸福我一人,辛苦千万家”的扩容应对方式,必然导致公司内部这套系统的使用者怨声载道,升级之路就是必然的了。

2 可扩展


大道之行也,天下为公,不同的系统有不同的构架,相同的系统总有类似的实现。类似于数据库的分库分表【关于分库分表,目前看到的最好的文章是参考文档1】,其扩展实现核心思想是分 Partition 分 Replica,但各 Replica 之间还区分leader(leader-follower,只有leader可接受写请求)和 non-leader(所有replica均可接收写请求)两种机制。

从数据角度来看,这套系统接收两种消息:Room Message(房间聊天消息)和 Gateway Message(用户登录消息)。两种消息的交汇之地就是Broker,所以应对扩展的紧要地方就是 Broker,Broker 的每个 Partition 采用 non-leader 机制,各 replica 均可接收 Gateway Message 消息写请求和 Room Message 转发请求。

首先,当 Room Message 量加大时可以对 Proxy 进行水平扩展,多部署 Proxy 即可因应 Room Message 的流量。

其次,当 Gateway Message 量加大时可以对 Router 进行水平扩展,多部署 Router 即可因应 Gateway Message 的流量。

最后,两种消息的交汇之地 Broker 如何扩展呢?可以把若干 Broker Replica 组成一个 Partition,因为 Gateway Message 是在一个 Partition 内广播的,所有 Broker Replica 都会有相同的 RoomGatewayList 数据,因此当 Gateway Message 增加时扩容 Partition 即可。当 Room Message 量增加时,水平扩容Partition 内的 Broker Replica 即可,因为 Room Message 只会发送到 Partition 内某个R eplica 上。

从个人经验来看,Room ID 的增长以及 Room 内成员的增加量在一段时间内可以认为是线性增加,而 Room Message 可能会以指数级增长,所以若部署得当则 Partition 扩容的概率很小,而 Partition 内 Replica 水平增长的概率几乎是 100%。

不管是 Partition 级别的水平扩容还是 Partition Replica 级别的水平扩容,不可能像系统极简版本那样每次扩容后都需要 Client 或者 Gateway 去更新配置文件然后重启,因应之道就是可用 zookeeper 充当角色的 Registry。通过这个 zookeeper 注册中心,相关角色扩容的时候在 Registry 注册后,与之相关的其他模块得到通知即可获取其地址等信息。采用 zookeeper 作为 Registry 的时候,所以程序实现的时候采用实时watch和定时轮询的策略保证数据可靠性,因为一旦网络有任何的抖动,zk就会认为客户端已经宕机把链接关闭。

分析完毕,与之相对的架构图如下:

分章节描述各个模块详细流程。

2.1 Client


Client详细流程如下:

本系统使用 Zookeeper 作为 Registry,而 Zookeeper 的通知机制有名的不可靠且延迟高(15s),所以采用 Zookeeper 自身的 Watch 机制的同时使用定时查询策略作为补偿机制。定时策略的定时时间单位应该是分钟级,不能小于 Zookeeper 自身的通知延时时长,以防止 Zookeeper 系统的轻微抖动造成消息系统的服务波动。

2.2 Proxy


Proxy详细流程如下:

    Broker Partition Number;
    ​新的Broker Partition(此时发生了扩容);
    Broker Partition内新的broker replica(Partition内发生了replica扩容);
    Broker Parition内某replica挂掉的信息;

之所以把 Room Message 和 Heartbeat Message 放在一个线程处理,是为了防止进程假死这种情况。

/pubsub/broker/partition_num 的值发生改变的时候(譬如值改为4),意味着 Router Partition 进行了扩展,Proxy 需及时获取新 Partition 路径(如 /pubsub/broker/Partition2/pubsub/broker/Partition3)下的实例,并关注这些路径,获取新 Partition 下的实例。

之所以 Proxy 在获取 Registry 下所有当前的 Broker 实例信息后再注册自身信息,是因为此时 Broker 子系统扩容完成,才具有转发消息的资格。

Proxy 转发某个 Room 消息时候,只发送给处于 Running 状态的 Broker。为 Broker Partition 内所有 replica 依据Registry 给其分配的 replicaID 进行递增排序,组成一个 Broker Replica Array,规则中BrokerPartitionReplicaNum 为 Array 的 size,而 BrokerReplicaID 为 replica 在 Array 中的下标。

2.2.1 Pipeline


收到的 Room Message 需要做三部工作:收取 Room Message、消息协议转换和向 Broker 发送消息。

初始系统这三步流程如果均放在一个线程内处理,proxy 的整体吞吐率只有 50 000 Msg/s,最后的实现方式是按照消息处理的三个步骤以 pipeline 方式做如下流程处理:

经过以上流水线改造后,Proxy 的整体吞吐率可达 200 000 Msg/s。

2.2.2 大房间消息处理


每个 Room 的人数不均,最简便的解决方法就是给不同人数量级的 Room 各搭建一套消息系统,不用修改任何代码。

然所谓需求推动架构改进,在系统迭代升级过程中遇到了这样一个需求:业务方有一个全国 Room,用于给所有在线用户进行消息推送。针对这个需求,不可能为了一个这样的 Room 单独搭建一套系统,况且这个 Room 的消息量很少。

如果把这个 Room 的消息直接发送给现有系统,它有可能影响其他 Room 的消息发送:消息系统是一个写放大的系统,全国 Room 内有系统所有的在线用户,每次发送都会卡顿其他 Room 的消息发送。

最终的解决方案是:使用类似于分区的方法,把这样的大 Room 映射为 64 个虚拟 Room【称之为 VRoom】。在 Room 号段分配业务线的配合下,给消息系统专门保留了一个号段,用于这种大 Room 的切分,在 Proxy 层依据一个 hash 方法 【 VRoomID = UserID % 64】 把每个 User 分配到相应的 VRoom,其他模块代码不用修改即完成了大 Room 消息的路由。

2.3 Broker


Broker详细流程如下:

注意 Broker 之所以先注册然后再加载 Database 中的数据,是为了在加载数据的时候同时接收 Router 转发来的 Gateway Message,但是在数据加载完前这些受到的数据先被缓存起来,待所有 RoomGatewayList 数据加载完后就把这些数据重放一遍;

Broker 之所以区分状态,是为了在加载完毕 RoomGatewayList 数据前不对 Proxy 提供转发消息的服务,同时也方便 Broker Partition 应对消息量增大时进行水平扩展。

当 Broker 发生 Partition 扩展的时候,新的 Partition 个数必须是 2 的幂,只有新 Partition 内所有 Broker Replica 都加载实例完毕,再更改 /pubsub/broker/partition_num 的值。

老的 Broker 也要 watch 路径 /pubsub/broker/partition_num 的值,当这个值增加的时候,它也需要清洗本地的路由信息缓存。

Broker 的扩容过程犹如细胞分裂,形成中的两个细胞有着完全相同的数据,分裂完成后【Registry路径 /pubsub/broker/partition_num 的值翻倍】则需要清洗垃圾信息。这种方法称为翻倍法。

2.4 Router


Router详细流程如下:

    Broker Partition Number;
    ​新的Broker Partition(此时发生了扩容);
    Broker Partition内新的broker replica(Partition内发生了replica扩容);
    Broker Parition内某replica挂掉的信息;

2.5 Gateway


Gateway详细流程如下:

Gateway 本地有一个基于共享内存的 LRU Cache,存储最近一段时间发送的消息的 MessageID。

3 系统稳定性


系统具有了可扩展性仅仅是系统可用的初步,整个系统要保证最低粒度的 SLA(0.99),就必须在两个维度对系统的可靠性就行感知:消息延迟和系统内部组件的高可用。

3.1 消息延迟


准确的消息延迟的统计,通用的做法可以基于日志系统对系统所有消息或者以一定概率抽样后进行统计,但限于人力目前没有这样做。

目前使用了一个方法:通过一种构造一组伪用户 ID,定时地把消息发送给 proxy,每条消息经过一层就把在这层的进入时间和发出时间以及组件自身的一些信息填入消息,这组伪用户的消息最终会被发送到一个伪 Gateway 端,伪 Gateway 对这些消息的信息进行归并统计后,即可计算出当前系统的平均消息延迟时间。

通过所有消息的平均延迟可以评估系统的整体性能。同时,因为系统消息路由的哈希方式已知,当固定时间内伪 Gateway 没有收到消息时,就把消息当做发送失败,当某条链路失败一定次数后就可以产生告警了。

3.2 高可用


上面的方法同时能够检测某个链路是否出问题,但是链路具体出问题的点无法判断,且实时性无法保证。

为了保证各个组件的高可用,系统引入了另一种评估方法:每个层次都给后端组件发送心跳包,通过心跳包的延迟和成功率判断其下一级组件的当前的可用状态。

譬如 proxy 定时给每个 Partition 内每个 broker 发送心跳,可以依据心跳的成功率来快速判断 broker 是否处于“假死”状态(最近业务就遇到过 broker 进程还活着,但是对任何收到的消息都不处理的情况)。

同时依靠心跳包的延迟还可以判断 broker 的处理能力,基于此延迟值可在同一 Partition 内多 broker 端进行负载均衡。

4 消息可靠性


公司内部内部原有一个走 tcp 通道的群聊消息系统,但是经过元旦一次大事故(几乎全线崩溃)后,相关业务的一些重要消息改走这套基于 UDP 的群聊消息系统了。这些消息如服务端下达给客户端的游戏动作指令,是不允许丢失的,但其特点是相对于聊天消息来说量非常小(单人1秒最多一个),所以需要在目前UDP链路传递消息的基础之上再构建一个可靠消息链路。

国内某 IM 大厂的消息系统也是以 UDP 链路为基础的,他们的做法是消息重试加 ack 构建了可靠消息稳定传输链路。但是这种做法会降低系统的吞吐率,所以需要独辟蹊径。

UDP 通信的本质就是伪装的 IP 通信,TCP 自身的稳定性无非是重传、去重和 ack,所以不考虑消息顺序性的情况下可以通过重传与去重来保证消息的可靠性。

基于目前系统的可靠消息传输流程如下:

正常的消息在群聊消息系统中传输时,Proxy 会根据消息的 Room ID 传递给固定的 Broker,以保证消息的有序性。

5 Router群集


当线上需要部署多套群聊消息系统的时候,Gateway 需要把同样的 Room Message 复制多份转发给多套群聊消息系统,会增大 Gateway 压力,可以把 Router 单独独立部署,然后把 Room Message 向所有的群聊消息系统转发。

Router 系统原有流程是:Gateway 按照 Room ID 把消息转发给某个 Router,然后 Router 把消息转发给下游 Broker 实例。新部署一套群聊消息系统的时候,新系统 Broker 的schema 需要通过一套约定机制通知 Router,使得 Router 自身逻辑过于复杂。

重构后的 Router 架构参照上图,也采用分 Partition 分 Replica 设计,Partition 内部各 Replica 之间采用 non-leader 机制;各 Router Replica 不会主动把Gateway Message 内容 push 给各 Broker,而是各 Broker 主动通过心跳包形式向 Router Partition 内某个 Replica 注册,而后此 Replica 才会把消息转发到这个 Broker 上。

类似于 Broker,Router Partition 也以 2 倍扩容方式进行 Partition 水平扩展,并通过一定机制保证扩容或者 Partition 内部各个实例停止运行或者新启动时,尽力保证数据的一致性。

Router Replica 收到 Gateway Message 后,replica 先把 Gateway Message 转发给 Partition 内各个 peer replica,然后再转发给各个订阅者。Router 转发消息的同时异步把消息数据写入 Database。

独立 Router 架构下,下面分别详述 Gateway、Router 和 Broker 三个相关模块的详细流程。

5.1 Gateway


Gateway详细流程如下:

    Router Partition Number;
    新的Router Partition(此时发生了扩容);
    Partition内新的replica(Partition内发生了replica扩容);
    Parition内某replica挂掉的信息;

第六步的规则决定了 Gateway Message 的目的 Partition 和 replica,规则内容有:

如果某 Router Partition ID 满足 condition RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber,则把消息转发到此 Partition;

这里之所以不采用直接 hash 方式 RouterPartitionID = RoomID % RouterPartitionNumber 获取 Router Partition,是考虑到当 Router 进行 2 倍扩容的时候当所有新的 Partition 的所有 Replica 都启动完毕且数据一致时才会修改 Registry 路径 /pubsub/router/partition_num 的值,按照规则的计算公式才能保证新 Partition 的各个 Replica 在启动过程中就可以得到 Gateway Message,也即此时每个 Gateway Message 会被发送到两个 Router Partition。
当 Router 扩容完毕,修改 Registry 路径 /pubsub/router/partition_num 的值后,此时新集群进入稳定期,每个 Gateway Message 只会被发送固定的一个Partition,condition RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber 等效于 RouterPartitionID = RoomID % RouterPartitionNumber

如果 Router Partition 内某 replia 满足 condition replicaPartitionID = RoomID % ReplicaNumber,则把消息转发到此replica。

replica 向 Registry 注册的时候得到的 ID 称之为 replicaID,Router Parition 内所有 replica 按照 replicaID 递增排序组成 replica 数组 RouterPartitionReplicaArray,replicaPartitionID 即为 replica 在数组中的下标。

5.1.1 Gateway Message 数据一致性

Gateway 向 Router 发送的 Router Message 内容有两种:某 user 在当前 Gateway 上进入某 Room 和某 user 在当前 Gateway 上退出某 Room,数据项分别是 UIN(用户ID)、Room ID、Gateway Addr和User Action(Login or Logout)。

由于所有消息都是走 UDP 链路进行转发,则这些消息的顺序就有可能乱序。Gateway 可以统一给其发出的所有消息分配一个全局递增的 ID【下文称为 GatewayMsgID,Gateway Message ID】以保证消息的唯一性和全局有序性。

Gateway 向 Registry 注册临时有序节点时,Registry 会给 Gateway 分配一个 ID,Gateway 可以用这个 ID 作为自身的 Instance ID【假设这个ID上限是 65535】。

GatewayMsgID 字长是 64 bit,其格式如下:

// 63 -------------------------- 48 47 -------------- 38 37 ------------ 0
// |  16bit Gateway Instance ID    |   10bit Reserve    |    38bit自增码  |

5.2 Router


Router 系统部署之前,先设置 Registry 路径 /pubsub/router/partition_num 的值为1。

Router 详细流程如下:

    Router Partition Number;
    Partition内新的replica(Partition内发生了replica扩容);
    Parition内某replica挂掉的信息;

之所以把 Gateway Message 和 Heartbeat Message 放在一个线程处理,是为了防止进程假死这种情况。

Broker 也采用了分 Partition 分 Replica 机制,所以向 Broker 转发 Gateway Message 时候路由规则,与 Gateway 向 Router 转发消息的路由规则相同。

另外启动一个工具,当水平扩展后新启动的 Partition 内所有 Replica 的状态都是 Running 的时候,修改 Registry 路径 /pubsub/router/partition_num 的值为所有 Partition 的数目。

5.3 Broker


Broker 详细流程如下:

    Router Partition Number;
    新的Router Partition(此时发生了扩容);
    Partition内新的replica(Partition内发生了replica扩容);
    Parition内某replica挂掉的信息;

BrokerPartitionNumber 可以小于或者等于或者大于 RouterPartitionNumber,两个数应该均是2的幂,两个集群可以分别进行扩展,互不影响。譬如BrokerPartitionNumber=4 而 RouterPartitionNumber=2,则 Broker Partition 3 只需要向 Router Partition 1 的某个 follower 发送心跳消息即可;若 BrokerPartitionNumber=4 而 RouterPartitionNumber=8,则 Broker Partition 3 需要向 Router Partition 3 的某个 follower 发送心跳消息的同时,还需要向 Router Partition 7 的某个 follower 发送心跳,以获取全量的 Gateway Message。

Broker 需要关注 /pubsub/router/partition_num/pubsub/broker/partition_num 的值的变化,当 router 或者 broker 进行 parition 水平扩展的时候,Broker 需要及时重新构建与 Router 之间的对应关系,及时变动发送心跳的 Router Replica 对象【RouterPartitionID = BrokerReplicaID % RouterPartitionNum,RouterPartitionID 为 Router Replica 在 PartitionRouterReplicaArray 数组的下标】。

当 Router Partition 内 replica 死掉或者发送心跳包的 replica 对象死掉(无论是注册中心通知还是心跳包超时),broker 要及时变动发送心跳的 Router replica 对象。

另外,Gateway 使用 UDP 通信方式向 Router 发送 Gateway Message,如若这个 Message 丢失则此 Gateway 上该 Room 内所有成员一段时间内(当有新的成员在当前 Gateway 上加入 Room
时会产生新的 Gateway Message)都无法再接收消息,为了保证消息的可靠性,可以使用这样一个约束解决问题:在此Gateway上登录的某 Room 内的人数少于 3 时,Gateway 会把 Gateway Message 复制两份非连续(如以10ms为时间间隔)重复发送给某个 Partition leader。因 Gateway Message 消息处理的幂等性,重复 Gateway Message 并不会导致 Room Message 发送错误,只在极少概率的情况下会导致 Gateway 收到消息的时候 Room 内已经没有成员在此 Gateway 登录,此时 Gateway 会把消息丢弃不作处理。

传递实时消息群聊消息系统的 Broker 向特定 Gateway 转发 Room Message 的时候,会带上 Room 内在此 Gateway 上登录的用户列表,Gateway 根据这个用户列表下发消息时如果检测到此用户已经下线,在放弃向此用户转发消息的同时,还应该把此用户已经下线的消息发送给 Router,当 Router 把这个消息转发给 Broker 后,Broker 把此用户从用户列表中剔除。通过这种负反馈机制保证用户状态更新的及时性

6 离线消息


前期的系统只考虑了用户在线情况下实时消息的传递,当用户离线时其消息便无法获取。若系统考虑用户离线消息传递,需要考虑如下因素:

离线消息的存储和传输,需要考虑用户的状态以及每条消息的发送状态,整个消息核心链路流程会有大的重构。新消息架构如下图:

系统名词解释:

1 Pi : 消息ID存储模块,存储每个人未发送的消息 ID 有序递增集合;

2 Xiu : 消息存储KV模块,存储每个人的消息,给每个消息分配 ID,以 ID 为 key,以消息内为 value;

3 Gateway Message(HB) : 用户登录登出消息,包括APP保活定时心跳(Hearbeat)消息;

系统内部代号貔貅(貔貅者,雄貔雌貅),源自上面两个新模块。

这个版本架构流程的核心思想为“消息ID与消息内容分离,消息与用户状态分离”。消息发送流程涉及到模块 Client/Proxy/Pi/Xiu,消息推送流程则涉及到模块 Pi/Xiu/Broker/Router/Gateway。

下面先细述Pi和Xiu的接口,然后再详述发送和推送流程。

6.1 Xiu


Xiu 模块功能名称是 Message Storage,用户缓存和固化消息,并给消息分配 ID。Xiu 集群采用分 Partition 分 Replica 机制,Partition 初始数目须是2的倍数,集群扩容时采用翻倍法。

6.1.1 存储消息

存储消息请求的参数列表为 {SnowflakeID,UIN, Message},其流程如下:

6.1.2 读取消息

读取消息请求的参数列表为{UIN, MsgIDList},其流程为:

6.1.3 主从数据同步

目前从简,暂定 Xiu 的副本只有一个。

Xiu 节点启动的时候根据自身配置文件中分配的 Xiu_Partition_ID 到 Registry 路径 /pubsub/xiu/partition_id 下进行注册一个临时有序节点,注册成功则 Registry 会返回 Xiu 的节点 ID。

Xiu节点获取 /pubsub/xiu/partition\_id 下的所有节点的 ID 和地址信息,依据 节点ID最小者为leader 的原则,即可判定自己的角色。只有 leader 可接受读写数据请求。

数据同步流程如下:

follower 会关注 Registry 路径 /pubsub/xiu/partition_id 下所有所有节点的变化情况,如果 leader 挂掉则及时转换身份并接受客户端请求。如果 follower 与 leader 之间的心跳超时,则 follower 删掉 leader 的 Registry 路径节点,及时进行身份转换处理客户端请求。

当 leader 重启或者 follower 转换为 leader 的时候,需要把 Partition_Msg_ID 进行一个大数值增值(譬如增加1000)以防止可能的消息 ID 乱序情况。

6.1.4 集群扩容

Xiu 集群扩容采用翻倍法,扩容时新 Partition 的节点启动后工作流程如下:

另外启动一个工具,当水平扩展后所有新启动的 Partition 内所有 Replica 的状态都是 Running 的时候,修改 Registry 路径 /pubsub/xiu/partition_num 的值为扩容后 Partition 的数目。按照开头的例子,即由2升级为4。

之所以 Xiu 不用像 Broker 和 Router 那样启动的时候向老的 Partition 同步数据,是因为每个 Xiu 分配的 MsgID 中已经带有 Xiu 的 PartitionID 信息,即使集群扩容这个 ID 也不变,根据这个ID也可以定位到其所在的Partition,而不是借助 hash 方法。

6.2 Pi


Pi 模块功能名称是 Message ID Storage,存储每个用户的 MsgID List。Xiu 集群也采用分 Partition 分 Replica 机制,Partition 初始数目须是2的倍数,集群扩容时采用翻倍法。

6.2.1 存储消息ID

MsgID 存储的请求参数列表为{UIN,MsgID},Pi 工作流程如下:

Pi有专门的日志记录线程,给每个日志操作分配一个 LogID,每个 Log 文件记录一定量的写操作,当文件 size 超过配置的上限后删除之。

6.2.2 读取消息ID列表

读取请求参数列表为{UIN, StartMsgID, MsgIDNum, ExpireFlag},其意义为获取用户 UIN 自起始ID为 StartMsgID 起(不包括 StartMsgID )的数目为 MsgIDNum 的消息ID列表,ExpireFlag意思是 所有小于等于 StartMsgID 的消息ID是否删除。 流程如下:

6.2.3 主从数据同步

同 Xiu 模块,暂定 Pi 的同 Parition 副本只有一个。

Pi 节点启动的时候根据自身配置文件中分配的 Pi_Partition_ID 到Registry路径 /pubsub/pi/partition_id 下进行注册一个临时有序节点,注册成功则 Registry 会返回 Pi 的节点 ID。

Pi 节点获取 /pubsub/pi/partition_id 下的所有节点的ID和地址信息,依据 节点ID最小者为leader 的原则,即可判定自己的角色。只有 leader 可接受读写数据请求。

数据同步流程如下:

follower 会关注 Registry 路径 /pubsub/pi/partition_id 下所有节点的变化情况,如果 leader 挂掉则及时转换身份并接受客户端请求。如果follower 与 leader 之间的心跳超时,则follower删掉 leader 的 Registry 路径节点,及时进行身份转换处理客户端请求。

6.2.4 集群扩容

Pi 集群扩容采用翻倍法。则节点启动后工作流程如下:

Proxy 会把读写请求参照条件 UIN % Pi\_Partition\_Num == Pi\_Partition\_ID % Pi\_Partition\_Num 向相关 partition 的 leader 转发用户请求。假设原来 PartitionNumber 值为2,扩容后值为4,则原来转发给 partition0 的写请求现在需同时转发给 partition0 和 partition2,原来转发给 partition1 的写请求现在需同时转发给 partition1 和 partition3。

另外启动一个工具,当水平扩展后所有新启动的 Partition 内所有 Replica 的状态都是 Running 的时候,修改Registry路径/pubsub/xiu/partition_num的值为扩容后 Partition 的数目。

6.3 数据发送流程


消息自 PiXiu 的外部客户端(Client,服务端所有使用 PiXiu 提供的服务者统称为客户端)按照一定负载均衡规则发送到 Proxy,然后存入 Xiu 中,把 MsgID 存入 Pi 中。其详细流程如下:

6.4 数据转发流程


转发消息的主体是Broker,原来的在线消息转发流程是它收到 Proxy 转发来的 Message,然后根据用户是否在线然后转发给 Gateway。

PiXiu架构下 Broker 会收到以下类型消息:

Broker流程受这五种消息驱动,下面分别详述其收到这五种消息时的处理流程。

用户登录消息流程如下:

可以把用户心跳消息当做用户登录消息处理。

Gateway的用户登出消息产生有三种情况:

用户登出消息处理流程如下:

处理 Proxy 发来的 Notify 消息处理流程如下:

所谓 Ack 消息,就是 Broker 经 Gateway 把消息转发给 App 后,App 给Broker的消息回复,告知Broker其最近成功收到消息的 MsgID。
Ack 消息处理流程如下:

总体上,PiXiu 转发消息流程采用拉取(pull)转发模型,以上面五种消息为驱动进行状态转换,并作出相应的动作行为。

7 单聊消息


前几章所述之系统架构均基于群聊这一实时通信场景,此群聊消息系统在公司内部稳定运行近一年半时间,期间经历了6个大版本的迭代演进,赢得了各个业务线同事的信任,承担了从群聊、聊天室、语音传递、到游戏信令传递等等各种场景的群聊实时通信业务。

2017 年 9 月份刚接手系统时,已有业务方提出把对单人下发的消息【以下称之为 Chat Message】也接入这套系统,即消息系统不仅是一个群聊消息下发通道,也应该是一个单聊消息下发通道,但是考虑到当时处于系统实现初期,个人基于系统小作的考量直接拒绝了。随着这一年半其接入的业务范围的扩展,目下公司正考虑拆分 Gateway,这套消息系统需要被重构以处理单聊消息就是势然了。

公司的 Gateway 大概是我见过的最复杂的接口层系统,各种本应该放在逻辑层处理的子系统都被糅合进了接口层 Gateway 这一单个模块之内,除了公司几个创始人外很少有人清楚其它内部逻辑,据说陆续有四个高薪招聘的高手在接手这套系统后三个月内跑路了。Gateway 内部与聊天场景有关的状态数据主要有两种:用户所在的群信息【以下简称 Router Data】以及用户自身的状态信息【包括在线状态以及客户端地址信息,以下简称 Relay Data】。

Router Data 已经被群聊系统的 Router 模块替代掉,当下的 Gateway 已经不需要存储 Router Data。若扩展此系统使其能够处理单聊消息,能够存储用户的在线状态以及用户接入的 Gateway 信息,则 Gateway 就可以不用存储任何有关用户的状态数据了。

7.1 实时消息系统架构


Relay Data 迥异于 Router Data:Relay Data 的 key 是 UIN,其数据传递依赖于 Relay Message;而 Router Data 的 key 是 GroupID,其数据传递依赖于 Gateway Message。此二者差异决定了这两种数据的处理不可能在一个单一的模块之内。

新的实时通信系统添加了一个 Relay 模块用于处理 Relay Message 并存储 Relay Data。添加了单聊消息处理能力的实时消息系统架构如下:

下面详述新架构下各个模块的功能和消息处理流程,至于系统注册、系统扩容以及注册通知等相关流程与以往处理机制雷同,下面不再详述。

注意:新架构图中把原来的专门存储 Router Data 的 Database 模块改称为 Router DB,并添加了一个 Relay DB,以专门存储 Relay Data。

7.2 Gateway

Gateway 不再存储 Router Data 和 Relay Data,几乎成了 APP 的透明代理,其功能如下:

7.3 Relay

Relay 是一个新模块,但其组织方式类似于 Router,亦是分 Partition 分 Replica,处理 Relay Message。

Relay 模块依据用户的 UIN 进行把不同用户的 Relay Data 放入不同的 Parition,同 Partition 内的 所有 Relay Replica 数据一致。

Relay 功能列表如下:

前面提及所有处理 Room Message 系统都是一个写放大的系统,究其原因是一条 Room Message 需要被复制下发给 Room 内每个人,所以 Relay 处理 Room Message 时需要根据 Room Message 内的 Room UIN List 对消息进行复制后下发给每个 User APP 所连接的 Gateway。

当然,如果用户不在线,Relay 会对 Room Message 和 Chat Message 都作丢弃处理,因为此系统只处理实时消息。

Relay 承担了群聊消息与单聊消息的下发。

7.4 Broker

新架构下 Broker 模块不再直接把 Room Message 转发给 Gateway,而是转发给 Relay。有了 Relay,Broker 自身只需要存储每个 Group 的 UIN List 即可,大大减轻了自身的任务流程,Broker 可以把 Relay 视作 pseudo-gateway

Broker 功能列表如下:

Broker 仅仅承担了群聊消息下发。

7.4 Proxy

Proxy 接收 Client 发来的消息时,需要区分消息是 Room Message 还是 Chat Message,新架构下其功能列表如下:

Proxy 对 Group Message 的处理仅仅是转发了事,并不需要复制,所以不存在写放大的问题。

其实还可以把 Proxy 区分为 Group Message Proxy 和 Chat Message Proxy,以期收逻辑清晰职责明了之效。

8 总结


这套群聊消息系统尚有以下task list需完善:

此记。

参考文档

Payment

Timeline