RocketMQ组件


RocketMQ组件

RocketMQ简单的看是由四个组件构成的,分别是NameServeBrokerProducerconsumer这四个组件构成的;

一个简单的执行过程是:

  1. consumer通过向NameServer询问具体的Topic所在的Broker地址
  2. consumer通过向Broker保持长连接的形式,获取到信息
  3. 同一组下的consumer会向broker中的不同MessageQueue根据offset的位置进行获取msg,offeset会根据不同的模式有不同的实现,如果是广播模式offset会保存在不同的consumer中,如果是集群模式offset是保存在broker中的

下面依次来介绍一下这几个组件

NameServer

NameServer是整个消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息。同时,各个角色的机器都要定期向NameServer上报自己的状态,超时不上报的话,NameServer会认为某个机器出故障不可用了,其他的组件会把这个机器从可用列表里移除。
NameServer本身是无状态的,也就是说NameServer中的Broker、Topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中的

NameServer和zk的对比:

  • NameServer只是作为一个轻量级的元数据服务器
  • zk是一个分布式应用程序提供协调服务
    RocketMQ的NameServer只有很少的代码,容易维护,所以不需要再依赖另一个中间件,从而减少整体维护成本

Broker

Broker主要负责消息的存储、传递和查询,以及服务的高可用保证。在Broker的Master-Slave架构中,Broker分为Master和Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master和Slave的对应关系是通过指定相同的BrokerName和不同的BrokerId来定义的。BrokerId 为 0 表示 Master,非 0 表示 Slave;

  • ROCKETMQ 架构总结

每个Broker都与NameServer集群中的所有节点建立长期连接,并定期向所有 NameServer 注册 Topic 信息。Producer与NameServer集群中的一个节点建立长连接,定时从NameServer获取topic路由信息,与提供Topic服务的Master建立长连接,定时向Master发送心跳。Producer是完全无状态的。Consumer与NameServer集群中的一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,与提供 Topic 服务的 Master 和 Slave 建立长连接,定期向 Master 和 Slave 发送心跳。Consumer 从 Master 订阅 Topic或奴隶。

Producer

Producer是作为数据源,将消息优化、写入和发布到一个或多个主题。生产者通过 MessageQueue 在代理之间负载平衡数据。它支持快速失败和发送消息期间的重试。

Consumer

Consumer是通过订阅Topic来读取消息,有两种消费的方式服务端推送或者客户端拉取;消费者需要在消费的时候作业务幂等和可靠性保证;

  • 执行过程
  1. Consumer 启动时需要指定 Namesrv 地址,与其中一个 Namesrv 建立长连接。消费者每隔 30 秒从 Namesrv 获取所有Topic 的最新队列情况
  2. 消费者端的负载均衡。根据消费者的消费模式不同,负载均衡方式也不同。

消费者消费模式

  1. 集群消费
    消费者的一种消费模式。一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。
  2. 广播消费
    消费者的一种消费模式。消息将对一 个Consumer Group 下的各个 Consumer 实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次

RocketMQ执行流程

  1. 启动 Namesrv,Namesrv起 来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心
  2. Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包
  3. 收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在 哪些 Broker上。也可以在发送消息时自动创建Topic
  4. Producer 发送消息
  5. Consumer 消费消息

RocketMQ执行过程

Producer执行流程

  1. Producer 启动时,也需要指定 Namesrv 的地址,从 Namesrv 集群中选一台建立长连接
  2. 生产者端的负载均衡,生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上

上面就是对RocketMQ整体结构的一个简单介绍,下面对RocketMQ中最重要的一个概念进行简单介绍

Message

Message是所有MQ中间件的核心领域,一切都是围绕着这个领域来作设计的;

存储消息

RocketMQ的数据存储结构分为两种:

  1. CommitLog是实际存储信息的单元,顺序进行写入
  2. ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址

刷盘策略分为同步刷盘异步刷盘

同步策略分为同步复制异步复制

  • 最佳实际:异步刷盘+同步复制

使用消息

”对一个应用来说,尽可能只用一个Topic,不同的消息子类型用Tag来标识(每条消息只能有一个Tag),服务器端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag以后,消费方在订阅消息时,才可以利用Tag在Broker端做消息过滤。

其次是消息的Key。对发送的消息设置好Key,以后可以根据这个Key来查找消息。所以这个Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker会创建专门的索引文件,来存储Key到消息的映射,由于是哈希索引,应尽量使Key唯一,避免潜在的哈希冲突

Tag和Key的主要差别是使用场景不同,Tag用在Consumer的代码中,用来进行服务端消息过滤,Key主要用于通过命令行查询消息。“

  • Tag过滤是在broker从Consume Queue中取数据的时候就进行比较,在这一过程中通过比较tag的hashCode值内容进行双重过滤
  • SQL过滤,通过在producer向消息中插入自定义属性和值的方式,在comsumer中使用**MessageSelect.bySql()**的方法进行过滤

如何提高消费速度?

  • 提高消费并行度
    1. 增加Consumer实例数量
    2. 增加单个Consumer中的并行度(增加线程)
  • 批量消费
  • 自定义消费offset逻辑,丢弃一部分消息

参考资料

RocketMQ简介


  TOC