RocketMQ入门
在工作中只是简单的使用了一下kafka进行系统间的交互,并未系统化的学习过消息队列这个中间件。基于开源MQ项目的一个活跃度,选择阿里开源出来的RocketMQ来进行学习,再来就是RocketMQ本身也是用java语言进行开发的一个项目。
安装
安装前提
需要拥有以下软件:
- 操作系统(推荐使用64位)
- jdk1.8 (推荐使用64位的)
- Maven 3.2x
- Git
- 4G以上的空间
安装jdk
- 下载jdk
- 解压
执行
sudo tar -zvxf jdk-8u201-linux-x64.tar.gz
- 修改/etc/profile
JAVA_HOME=/home/java/jdk1.8.0_60
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
-
source /etc/profile
-
检查 java -version
安装Maven
- 下载mave
- 解压、设置环境path、生效
安装Git
- 更新源
apt-get update -y
apt-get upgrade -y
- 安装git
apt install git
- git --version
安装RocketMQ
-
解压
-
修改内存设置
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=64m"
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/home/java/jdk1.8.0_60
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
- 启动NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
- 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
- 发送和接收消息
- Producer
public static void main(String[] args) throws Exception {
// 构造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName12345");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicTest", "TagA",
("Hello RocketMQ!!!" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg,10000);
System.out.println(sendResult);
}
// producer.shutdown();
}
- Consumer
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushMsgGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf(Thread.currentThread().getName() + "Receive New Messages :" + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
上述运行环境是在windows子系统中进行运行的,但是由于网络的问题producer一直连接不上broker,导致实验失败。最后切换到windows环境下,运行倒是成功了,但是由于c盘太小了,每次都要删除commitLog进行运行,实在是太耗费精力和时间了,所以不在环境问题上纠结了。
RocketMQ中的基础概念
-
NameServer
NameServer功能近似于zk,物理逻辑上的broker启动后向NameServer注册。NameServel是一种无状态的,每一个节点保存的都是全量的Broker节点信息,其他节点通过和NameServer保持长连接来获取信息 -
broker
broker类似于物理逻辑上的机器,支持主从部署,是实际于消息体交互的 -
producer
producer消息的提供方 -
comsumer
comsumer消息的消费方
具体的交互逻辑,如下图展示: