RocketMQ中的通信模式
在RocketMQ中为producer提供了三种通讯模式分别是同步、异步、单向;
同时对于消息类型也提供了多种选择:广播消息、延迟消息、
顺序消息、事务消息等选择;
下文就主要介绍一些概念性的东西和最佳实践
producer
同步模式
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(MqConfig.GROUP_ID, getAclRPCHook());
producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message(MqConfig.TOPIC,
MqConfig.TAG,
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
// producer.shutdown();
}
使用DefaultMQProducer.send(msg)发送消息就是同步的,向broker发送消息后就立即返回
异步模式
try {
Message msg = new Message(MqConfig.TOPIC,
MqConfig.TAG,
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(Thread.currentThread().getName() + ":" + sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + e.getMessage());
}
});
} catch (Exception e) {
e.printStackTrace();
}
使用producer.send(msg, new SendCallback()),传入回调函数,这样发送的消息是一个异步的
单向消息
producer.sendOneway(msg);
**producer.sendOneway(msg)**发送的是单向消息
Message
延迟消息
- Message.java
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
RocketMQ默认只支持固定的延迟级别,如果需要做到自定义延迟级别需要进行扩展。可以用长轮询+短轮询的方式实现
顺序消息
由于在一个Topic下会有多个MessageQueue,因此当同一组下的多个消费者线程来消费时,数据消费就有可能是无序的。
int orderId = i;
Message msg = new Message(MqConfig.TOPIC,
MqConfig.TAG,
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(1);
producer.sendOneway(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
- 有序消息- consumer
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
通过orderId将所属同一批有顺序的msg发送到同一个msssageQueue中,来保证消费的消息的有序
事务消息
mq实现本地事务主要是将投递消息完成的动作和事务执行成功的动作结合在一起形成一个原子操作。当本地事务执行成功就投递事务完成的消息。
- 事务消息:消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能,通过消息队列 RocketMQ 版事务消息能达到分布式事务的最终一致。
- 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
最佳实践
Topic 与 Tag
-
Topic消息主题,通过 Topic 对不同的业务消息进行分类
-
Tag消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性
消费幂等
因为排除消息的重复并不属于mq的职责,而是业务端需要进行处理的。处理重复消息主要是用业务幂等的方式来进行处理;
还有一种记录重复消息消费的方案是使用日志表来进行处理。
订阅关系
订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例的处理逻辑必须完全一致。具体来说就是同一个消费者 Group ID 下所有的实例需在:
- 订阅的 Topic 必须一致
- 订阅的 Topic 中的 Tag 必须一致
因为这样才能按照广播或者集群的消费模式去共同消费同一批的消息。
简单的来说就是我们的消费者必须是一个无状态的服务;