这段时间对消息总线进行了再次重构。本次重构主要针对消息总线的 简化client的复杂度之前的client需要同时连接两个分布式组件。消息总线的访问需要用户提供 当时的想法是出于安全的角度考虑,不让用户直面MQ Server,而MQ的选择理论上可以有多种,这些对用户都是透明的。但作为一个后端组件,安全问题本不是最重要的关注目标,而替换MQ这样的成本无异于重写消息总线,这样的可能性也不大。除此之外这还带来了额外的复杂度与高失败率(当pubsuber与RabbitMQ两者中有其一失败,消息总线就将陷入混乱),结合到消息总线较多的长连接场景(比如,push模式的consume),一旦一个组件失效就可能导致客户程序的重启(为了重新初始化连接)。 让消息总线客户端只连接单一的RabbitMQ组件,可以大大降低失效的概率,而且RabbitMQ官方client提供的失效重试机制也能更好得发挥作用。 用RPC获取授权信息因为之前的pubsuber部分承担了授权信息数据源的角色,移除之前的pubsuber组件,那么就需要重新设计获取远程授权信息的方案。因为RabbitMQ正好提供了基于JSON的轻量级RPC机制,我们就可以通过RPC从后端获取授权信息,而让后端去跟pubsuber交互。之前就曾有过这个想法,后来在使用HBase时,发现其java client内部也有通过RPC跟Master节点交互,于是这次就确定用这种方式来实现。其实采用RPC的形式可以大大简化客户端的逻辑实现,而且也大大降低了升级成本。 修改broadcast和pubsub的实现之前pubsuber在客户端还起到了两个作用: 我们新建了一个内部使用的exchange来实现消息路由。跟其他topic类型的exchange不同,这里我们采用并不常见的headers类型的exchange。 header类型:根据消息的消息头里包含特殊的key-value对来进行路由 考虑到我们需要重新实现上面两个功能,所以,我们将消息分成两类:
在发送这两个消息时,只需要在消息头中指定对应的header的key-value即可实现自动路由。这两种消息类型分别对应的是绑定在inner exchange后的队列的类型。 如何节省RabbitMQ Server的资源考虑到几乎每个client都有接受这两种类型的消息的需求,而我们为每个客户端在该exchange下创建两个queue多少有些过于浪费,最好的做法当然是在客户端使用的会话周期内建立两个临时队列,等客户端使用结束就可立即销毁队列回收资源。 得益于RabbitMQ丰富的特性,我们可以很容易做到这一点。当我们实例化客户端的时候,我们在内部创建两个 临时且排他属性是通过在创建队列时,指定队列的 这两个队列被创建后,当前客户端会作为消费者立即挂载在队列上等待 发送端无需知晓上述两个队列的具体名称,它只需知道代理exchange以及inner exchange的routing-key即可,然后在发送消息的消息头中指定需要发送的是 代码片段: 去除pubsuber的封装为什么要封装在去除之前,我想谈谈当初为什么要封装。在最初封装消息总线的时候,我对redis和zookeeper都有所了解,它们都有一些共同的特性,比如:
这是消息总线客户端的pubsuber需要的,但为了提供可选性,我在这两个特性上做了一层封装,可以使得这种配置变更组件无论选择哪一个,无需修改代码,两者都可适配。这是当初封装的目的。 为什么要去除封装首先,去除封装是回到了zookeeper而排除了redis。这么除了发现太多的开源软件都在使用zookeeper来实现这个需求,除了发现这是zookeeper的专长,而redis只是能提供这些功能而已。除了这些,最关键的问题是我发现当涉及到 在分布式的服务中,很可能会存在多个组件,而这些组件跟应用之间会存在一些逻辑关系,而不都是简单的扁平关系。很多情况下,我们需要将一些关系构建成树状结构。比如,现在消息总线只变成了平台中的一个组件,我们需要在配置上体现这种关系,所以可能会由原先的扁平关系修改为如下图这样的形式: 在这种类似于文件系统的树状结构下,要实现诸如 一些思考拓扑的权衡消息总线最初的目标主要偏向消息传递。但在实现中的添加了一些额外的特性,比如之前的RPC功能。其实,如果单从技术层面上来看消息总线就是收发消息。但如果你将收发消息的主体包含进来(也就是发送者和消费者)会有一些新的定位。如果有一些消费者做的事情是很
消息总线可以直接提供这些服务以供第三方申请使用,当然如果带上语义来看,RPC的服务端也是一种服务(只不过是同步的服务而已),其他队列也可能在提供某种服务,只是它们的专有性更强,所以消息总线也具有提供服务的能力以及构建服务的基础。 所以我一直在考虑,整个路由图应该是这样构建: 还是这样来构建: 消息租赁因为消息的通信模型生来就具有异步性。那么消息的消费时机,对于消息总线本身而言是无法知晓的,这就产生了消息长期堆积压垮消息总线的可能。所以,可能会考虑将消息的永久驻留改为按序驻留。这取决于业务,有些业务消息是具有时效性的,这样的消息如果隔个几周还没有被消费掉,那么它存在的意义几乎没有了,而它却白白占据着总线服务器的内存或磁盘资源,甚至这些消息将永远得不到消费也有可能。 所谓消息租赁,其实是将现在的永久留存的模式改为临时驻留队列的模式,具体消息能够存活多久,这取决于给消息设置的TTL(time to live)时间,而对于TTL的评估来自于队列申请方根据自身的业务特点而定。当然,TTL可以也可以设置为永久,这需要接收审核。 proxy的必要性对于一个组件的扩展有很多种模式,比如 而对于 现在对RabbitMQ的扩展采用的是 我曾看到携程开源的消息系统是在 来源:CSDN |
|
声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系
[邮箱地址] 删除
|