RocketMQ组件
RocketMQ简单的看是由四个组件构成的,分别是NameServe、Broker、Producer、consumer这四个组件构成的;
一个简单的执行过程是:
- consumer通过向NameServer询问具体的Topic所在的Broker地址
- consumer通过向Broker保持长连接的形式,获取到信息
- 同一组下的consumer会向broker中的不同MessageQueue根据offset的位置进行获取msg,offeset会根据不同的模式有不同的实现,如果是广播模式offset会保存在不同的consumer中,如果是集群模式offset是保存在broker中的
下面依次来介绍一下这几个组件
NameServer
NameServer是整个消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息。同时,各个角色的机器都要定期向NameServer上报自己的状态,超时不上报的话,NameServer会认为某个机器出故障不可用了,其他的组件会把这个机器从可用列表里移除。
NameServer本身是无状态的,也就是说NameServer中的Broker、Topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中的
NameServer和zk的对比:
- NameServer只是作为一个轻量级的元数据服务器
- zk是一个分布式应用程序提供协调服务
RocketMQ的NameServer只有很少的代码,容易维护,所以不需要再依赖另一个中间件,从而减少整体维护成本
Broker
Broker主要负责消息的存储、传递和查询,以及服务的高可用保证。在Broker的Master-Slave架构中,Broker分为Master和Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master和Slave的对应关系是通过指定相同的BrokerName和不同的BrokerId来定义的。BrokerId 为 0 表示 Master,非 0 表示 Slave;
- ROCKETMQ 架构总结
每个Broker都与NameServer集群中的所有节点建立长期连接,并定期向所有 NameServer 注册 Topic 信息。Producer与NameServer集群中的一个节点建立长连接,定时从NameServer获取topic路由信息,与提供Topic服务的Master建立长连接,定时向Master发送心跳。Producer是完全无状态的。Consumer与NameServer集群中的一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,与提供 Topic 服务的 Master 和 Slave 建立长连接,定期向 Master 和 Slave 发送心跳。Consumer 从 Master 订阅 Topic或奴隶。
Producer
Producer是作为数据源,将消息优化、写入和发布到一个或多个主题。生产者通过 MessageQueue 在代理之间负载平衡数据。它支持快速失败和发送消息期间的重试。
Consumer
Consumer是通过订阅Topic来读取消息,有两种消费的方式服务端推送或者客户端拉取;消费者需要在消费的时候作业务幂等和可靠性保证;
- 执行过程
- Consumer 启动时需要指定 Namesrv 地址,与其中一个 Namesrv 建立长连接。消费者每隔 30 秒从 Namesrv 获取所有Topic 的最新队列情况
- 消费者端的负载均衡。根据消费者的消费模式不同,负载均衡方式也不同。
消费者消费模式
- 集群消费
消费者的一种消费模式。一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。 - 广播消费
消费者的一种消费模式。消息将对一 个Consumer Group 下的各个 Consumer 实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次
RocketMQ执行流程
- 启动 Namesrv,Namesrv起 来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心
- Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包
- 收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在 哪些 Broker上。也可以在发送消息时自动创建Topic
- Producer 发送消息
- Consumer 消费消息
Producer执行流程
- Producer 启动时,也需要指定 Namesrv 的地址,从 Namesrv 集群中选一台建立长连接
- 生产者端的负载均衡,生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上
上面就是对RocketMQ整体结构的一个简单介绍,下面对RocketMQ中最重要的一个概念进行简单介绍
Message
Message是所有MQ中间件的核心领域,一切都是围绕着这个领域来作设计的;
存储消息
RocketMQ的数据存储结构分为两种:
- CommitLog是实际存储信息的单元,顺序进行写入
- ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址
刷盘策略分为同步刷盘、异步刷盘
同步策略分为同步复制、异步复制
- 最佳实际:异步刷盘+同步复制
使用消息
”对一个应用来说,尽可能只用一个Topic,不同的消息子类型用Tag来标识(每条消息只能有一个Tag),服务器端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag以后,消费方在订阅消息时,才可以利用Tag在Broker端做消息过滤。
其次是消息的Key。对发送的消息设置好Key,以后可以根据这个Key来查找消息。所以这个Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker会创建专门的索引文件,来存储Key到消息的映射,由于是哈希索引,应尽量使Key唯一,避免潜在的哈希冲突
Tag和Key的主要差别是使用场景不同,Tag用在Consumer的代码中,用来进行服务端消息过滤,Key主要用于通过命令行查询消息。“
- Tag过滤是在broker从Consume Queue中取数据的时候就进行比较,在这一过程中通过比较tag的hashCode值和内容进行双重过滤
- SQL过滤,通过在producer向消息中插入自定义属性和值的方式,在comsumer中使用**MessageSelect.bySql()**的方法进行过滤
如何提高消费速度?
- 提高消费并行度
- 增加Consumer实例数量
- 增加单个Consumer中的并行度(增加线程)
- 批量消费
- 自定义消费offset逻辑,丢弃一部分消息