> For the complete documentation index, see [llms.txt](https://jun-wang.gitbook.io/learnjava/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://jun-wang.gitbook.io/learnjava/ji-shu-xue-xi/web-zhong-jian-jian-xue-xi/rocketmq-ru-men.md).

# RocketMQ入门

## 1. RocketMQ简介

RocketMQ是阿里开源的消息中间件，它是纯java开发，具有低延迟、高吞吐量、高可用性和适合大规模分布式系统应用的特点。从名字可以看出Rocket火箭，代表RocketMQ主打速度。RocketMQ思路起源于Kafka，它对消息的可靠传输及事务性做了优化。

学习MQ必须知道的几个专业术语：

### 1）Producer

消息生产者，生产者的作用就是将消息发送到 MQ，生产者本身既可以产生消息，如读取文本信息等。也可以对外提供接口，由外部应用来调用接口，再由生产者将收到的消息发送到 MQ。 （Producer Group生产者组，简单来说就是多个发送同一类消息的生产者称之为一个生产者组。）

### 2）Consumer

消息消费者，简单来说，消费 MQ 上的消息的应用程序就是消费者，至于消息是否进行逻辑处理，还是直接存储到数据库等取决于业务需要。(Consumer Group消费者组，和生产者组类似，消费同一类消息的多个 consumer 实例组成一个消费者组。)

### 3）Topic

Topic 是一种消息的逻辑分类，比如说你有订单类的消息，也有库存类的消息，那么就需要进行分类，一个是订单 Topic 存放订单相关的消息，一个是库存 Topic 存储库存相关的消息。

### 4）Message

Message 是消息的载体。一个 Message 必须指定 topic，相当于寄信的地址。Message 还有一个可选的 tag 设置，以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对，例如你需要一个业务 key 来查找 broker 上的消息，方便在开发过程中诊断问题。

### 5）Tag

标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

### 6）Broker

Broker 是 RocketMQ 系统的主要角色，其实就是前面一直说的 MQ。Broker 接收来自生产者的消息，储存以及为消费者拉取消息的请求做好准备。

### 7）Name Server

Name Server 为 producer 和 consumer 提供路由信息。

**RocketMQ架构图：**

![](/files/-M7h5fVTG8n4uzG8JVOf)

## 2. 为什么使用消息队列MQ

使用消息队列主要是为了解决三个问题：松耦合、异步和削峰。下面简单介绍一下这三种作用的应用场景：

### 2.1 松耦合

传统的系统间耦合性太强，系统A在代码中直接调用系统B和系统C的代码，如果将来D系统接入，系统A还需要修改代码，过于麻烦，使用MQ将消息写入消息队列，需要消息的系统自己从消息队列中订阅，从而系统A不需要做任何修改。

### 2.2 异步

一些非必要的业务逻辑以同步的方式运行，太耗费时间。 将消息写入消息队列，非必要的业务逻辑以异步的方式运行，加快响应速度。

### 2.3 削峰

在高并发分布式环境下，在高峰请求的时候，由于来不及处理，请求往往发生堵塞，比如大量的update、insert操作同时到达mysql，直接导致大量的行锁和表锁，甚至触发too many connections错误。通过使用消息队列，我们可以异步处理请求，按照数据处理能力从消息队列中慢慢的拉取消息，在生产中，这个短暂的高峰期积压是允许的。

### 缺点：

引入MQ也会有一些缺点，比如系统的可用性降低，因为多加了一个系统就要保证MQ不出问题，一旦MQ挂了，整个系统也就挂了。

还有就是系统的复杂性提高了，加入MQ之后要考虑很多问题，比如一致性问题，如何保证消息不被重复消费，如何保证消息可靠性传输。

## 3. Linux下安装使用RocketMQ

简单介绍了RocketMQ之后，我们动手安装一下它吧。环境：

```
Ubuntu x86_64 GNU/Linux
java version "1.8.0_181"
Apache Maven 3.5.4 
git version 1.9.1
```

### 3.1 下载代码

```
git clone https://github.com/apache/rocketmq.git
```

### 3.2 使用maven编译打包

由于默认的中央仓库在国外下载东西可能导致失败，可以将其改为国内镜像，编辑maven的配置文件settings.xml，添加：

```markup
<mirror>
    <id>aliyun</id>
    <mirrorOf>central</mirrorOf>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
```

然后再编译打包RocketMQ：

```
cd rocketmq/
mvn -Prelease-all -DskipTests clean install -U
```

成功后会显示build success。

### 3.3 启动Name Server

```
cd distribution/target/apache-rocketmq
nohup sh bin/mqnamesrv &
tail -f bin/nohup.out
# 启动成功显示
The Name Server boot success. serializeType=JSON
```

### 3.4 启动Broker

```
nohup sh bin/mqbroker -n localhost:9876 &
tail -f bin/nohup.out
# 启动成功显示
The broker[**, **.**.**.**:10911] boot success. serializeType=JSON and name server is localhost:9876
```

### 3.5 测试发送和接收消息

在发送和接收之前我们需要告诉客户端name servers的地址，RocketMQ提供了很多种方式来实现，为了演示方便我们使用环境变量`NAMESRV_ADDR`。

```
 export NAMESRV_ADDR=localhost:9876
 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 # 可以看到有很多条消息已经发送成功了
 SendResult [sendStatus=SEND_OK, msgId= ...
 SendResult [sendStatus=SEND_OK, msgId= ...
 SendResult [sendStatus=SEND_OK, msgId= ...

 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 # 可以看到刚才发送的消息已经成功被消费者消费了
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
```

### 3.6 关闭服务

```
sh bin/mqshutdown broker
# 关闭成功后显示
The mqbroker(8916) is running...
Send shutdown request to mqbroker(8916) OK


sh bin/mqshutdown namesrv
# 关闭成功后显示
The mqnamesrv(8892) is running...
Send shutdown request to mqnamesrv(8892) OK
```

## 4. 在spring中使用RocketMQ

### 4.1 在pom中进行依赖

```markup
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>
```

### 4.2 生产者生产数据

RocketMQ提供了三种生产数据的模式：

* 同步模式：消息发送方发出数据后，会在收到接收方发出响应之后才发下一个数据包。同步模式应用场景比较广泛，比如短信通知。
* 异步模式：消息发送方发出数据后，不等待接收方发出响应就发出下一个数据包，提供回调函数。异步模式一般用于对响应时间比较敏感的场景。
* 单向模式：消息发送方发出消息后不等待服务器响应也没有回调函数。单向模式用于对可靠性要求一般的情况，比如日志收集。

三种模式的比较：

| 发送方式 | 发送TPS | 发送结果反馈 | 可靠性  |
| ---- | ----- | ------ | ---- |
| 同步   | 快     | 有      | 不丢失  |
| 异步   | 快     | 有      | 不丢失  |
| 单向   | 最快    | 无      | 可能丢失 |

**4.2.1 同步模式示例代码**

```java
/*
* 同步生产消息l
*/
public void syncProducer() throws Exception {
    // 使用生产者组名初始化一个生产者
    DefaultMQProducer producer = new DefaultMQProducer(Constants.MQ_GROUP_NAME);
    // 指定name server的地址
    producer.setNamesrvAddr(Constants.MQ_NAMESERVER_ADDRESS);
    // 启动实例
    producer.start();

    for (int i = 0; i < 5; i++) {
        Message msg = new Message("TopicTest", "TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg);
        logger.info("生产者同步生产的消息:{}", sendResult.toString());
    }

    // 及时关闭生产者
    producer.shutdown();
}
```

**4.2.2 异步模式示例代码**

```java
/*
* 异步生产消息
*/
public void asyncProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer(Constants.MQ_GROUP_NAME);
    producer.setNamesrvAddr(Constants.MQ_NAMESERVER_ADDRESS);
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);

    for (int i = 0; i < 5; i++) {
        final int index = i;
        Message msg = new Message("TopicTest", "TagA", "OrderID188", ("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                logger.info("index:" + index + "msgId:" + sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                logger.error("index:" + index + e.toString());
                e.printStackTrace();
            }
        });
    }

    producer.shutdown();
}
```

**4.2.3 单向模式示例代码**

```java
/*
* 单向生产消息
*/
public void oneWayProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer(Constants.MQ_GROUP_NAME);
    producer.setNamesrvAddr(Constants.MQ_NAMESERVER_ADDRESS);
    producer.start();

    for (int i = 0; i < 5; i++) {
        Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.sendOneway(msg);
        logger.info("生产者单向生产的消息:" + msg.toString());
    }

    producer.shutdown();
}
```

### 4.3 消费者消费数据

```java
public void consume() throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Constants.MQ_GROUP_NAME);
    consumer.setNamesrvAddr(Constants.MQ_NAMESERVER_ADDRESS);
    //消费指定的topic
    consumer.subscribe("TopicTest", "*");

    //注册消费者回调函数
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            logger.info("消费的消息条数:" + msgs.size());
            for(MessageExt msg: msgs) {
                logger.info("消费的消息:" + msg.toString());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
    logger.info("Consumer Started...");
}
```

## 5. 遇到的问题

### 5.1  启动服务时报内存不足

执行

```
nohup sh mqnamesrv &
```

时，报错

```
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 2147483648 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/wangjun/rocketmq/distribution/bin/hs_err_pid8300.log
```

#### 解决方案

修改rocketmq/distribution/target/apache-rocketmq/bin/runserver.sh和runbroker.sh。

将

```
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
```

改为：

```
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
```

可以看出之前的默认设置是4g内存，如果你的机器没有这么大只是自己搭建着玩就把它设置小一点就行了。

顺便将tools.sh的内存也改成256m，不然运行消息的发送和接收的demo的时候也会报错。

有的时候抛出的错误`Native memory allocation (mmap) failed to map 541255 bytes for committing reserved memory.`很小，有可能就是内存不够，尝试关闭了tomcat，然后就可以了。

**PS：网上很多教程因为时间和版本问题有各种问题，最稳妥的办法就是看官方教程，即使英文不好也好坚持看，必定事半功倍**

> 参考：
>
> 消息队列系列（1）为什么使用MQ：<https://blog.csdn.net/dadadie/article/details/51553780>
>
> \*为什么要是用消息队列以及消息队列的优缺点分析：<https://blog.csdn.net/alinshen/article/details/80583214>
>
> \*RocketMQ 实战之快速入门：<https://www.jianshu.com/p/824066d70da8>
>
> 官网教程：<http://rocketmq.apache.org/docs/quick-start/>
>
> 阿里云文档：<https://help.aliyun.com/product/29530.html?spm=a2c4g.750001.15.2.11df7b13oDiOUp>
>
> Kafka、RabbitMQ、RocketMQ等消息中间件的对比：<https://blog.csdn.net/yunfeng482/article/details/72856762>
>
> 阿里中间件，十分钟入门RocketMQ：<http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/>
