RocketMQ入门


RocketMQ入门

在工作中只是简单的使用了一下kafka进行系统间的交互,并未系统化的学习过消息队列这个中间件。基于开源MQ项目的一个活跃度,选择阿里开源出来的RocketMQ来进行学习,再来就是RocketMQ本身也是用java语言进行开发的一个项目。

安装

安装前提

需要拥有以下软件:

  • 操作系统(推荐使用64位)
  • jdk1.8 (推荐使用64位的)
  • Maven 3.2x
  • Git
  • 4G以上的空间

安装jdk

  1. 解压
    执行
sudo tar -zvxf jdk-8u201-linux-x64.tar.gz
  1. 修改/etc/profile
JAVA_HOME=/home/java/jdk1.8.0_60
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
  1. source /etc/profile

  2. 检查 java -version

安装Maven

  1. 解压、设置环境path、生效

安装Git

  • 更新源
apt-get update -y
apt-get upgrade -y
  • 安装git
apt install git
  • git --version

安装RocketMQ

  1. 解压

  2. 修改内存设置

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=64m"

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/home/java/jdk1.8.0_60
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"


JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
  1. 启动NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
  1. 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log 
  1. 发送和接收消息
  • Producer
    public static void main(String[] args) throws Exception {
        // 构造Producer
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName12345");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest", "TagA",
                    ("Hello RocketMQ!!!" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg,10000);
            System.out.println(sendResult);
        }
//        producer.shutdown();
    }
  • Consumer
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushMsgGroup");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        System.out.printf(Thread.currentThread().getName() + "Receive New Messages :" + msgs + "%n");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
}

上述运行环境是在windows子系统中进行运行的,但是由于网络的问题producer一直连接不上broker,导致实验失败。最后切换到windows环境下,运行倒是成功了,但是由于c盘太小了,每次都要删除commitLog进行运行,实在是太耗费精力和时间了,所以不在环境问题上纠结了。

RocketMQ中的基础概念

  • NameServer
    NameServer功能近似于zk,物理逻辑上的broker启动后向NameServer注册。NameServel是一种无状态的,每一个节点保存的都是全量的Broker节点信息,其他节点通过和NameServer保持长连接来获取信息

  • broker
    broker类似于物理逻辑上的机器,支持主从部署,是实际于消息体交互的

  • producer
    producer消息的提供方

  • comsumer
    comsumer消息的消费方

具体的交互逻辑,如下图展示:
RocketMQ架构图

RocketMQ架构图

参考文章
RocketMQ入门篇
windows下安装rocketMQ


  TOC