使用Docker安装
拉取镜像(打开管理界面镜像):
docker pull rabbitmq:management
启动容器(设置账号密码):
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 --hostname my-rabbit --name my-rabbitmq rabbitmq:management
检查是否启动成功:
docker ps
后台管理页面
访问:http://localhost:15672 ,账号密码为自己启动容器设置的;如未设置默认账号密码都为guest
点击Admin页签可以创建自己的用户
用户标签说明
- administrator:用户可以完成监视所能做的一切,管理用户、虚拟主机和权限,关闭其他用户的连接,以及管理所有虚拟主机的策略和参数。
- monitoring:用户可以访问管理插件并查看所有连接和通道以及节点相关信息。
- policymaker:用户可以访问管理插件,并管理他们有权访问的vhost的策略和参数。
- management:用户可以访问管理插件
- none:用户无权限,不能访问management plugin
消息中间件协议
是在TCP/IP协议基础上构建的一种约定成俗的规范和机制,主要目的是可以让不同的客户端进行通讯,并且这种协议下的规范更具有:持久性、高可用和高可靠的性能
AMQP协议
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
支持的MQ
MQTT协议
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
支持的MQ
Kafka协议
Kafka协议是基于TCP/IP的一种二进制协议,消息内部是通过长度进行分割,由基本数据类型组成
支持的MQ
消息分发策略
MQ/策略 | 发布订阅 | 轮询分发 | 公平分发 | 重试 | 消息拉取 |
---|---|---|---|---|---|
RabbitMQ | Yes | Yes | Yes | Yes | Yes |
RocketMQ | Yes | No | No | Yes | Yes |
ActiveMQ | Yes | Yes | No | Yes | No |
Kafka | Yes | Yes | Yes | No | Yes |
RabbitMQ核心组成
- Server:又称Broker,接受客户端的请求,实现AMQP实体服务
- Connection:连接,应用程序与Broker的网络连接:TCP/IP 三次握手四次挥手
- Channel:网络信道,几乎所有的操作都在Channel中进行,是消息读写通道,客户端可以建立各自的Channel,每个Channel代表一个会话任务
- Message:消息,服务与应用程序之间传送的数据,由Properties和Body组成;Properties可对消息进行修饰,比如消息的优先级,延迟 等高级特性,Body就是消息体
- Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange
- Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列上
- Bindings:Exchange和Queue之间的虚拟连接,可以保护多个Routing key
- Routing key:路由规则,虚拟机可以用它来确定Ruhr路由一个特定消息
- Queue:队列,消息队列,保存消息并将它们转发给消费者
RabbitMQ模式(页面测试)
一、简单模式
首先创建一个队列
点击进去操作队列
再发送一个消息
随机预览消息(预览消息完不会删除消息,切换Ack Mode
为Automatic ack
即可)
二、工作模式
工作模式又分别有两种模式:
- 轮询分发模式:一个消费者一条,按均分配
- 公平分发模式:根据消费者的能力进行公平分发,按劳分配
三、发布/订阅模式
再创建一个队列
创建一个订阅/发布类型的交换机
再来进行绑定队列
测试在交换机中发送消息
可以切换到Queues页面,看到这两个被绑定的队列都有了消息
四、路由模式
创建一个路由类型的交换机
再来进行绑定队列(注意填写Routing key)
测试在交换机中指定Routing key:queue01 ---> test_queue01
发送消息
可以切换到Queues页面,看到这只有test_queue01
有消息
五、主题模式
六、RPC模式
七、确认模式
代码实现
一、初始化SpringBoot工程
创建项目
引入相关pom
1
2
3
4
5
6
7
8
9<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>增加yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28server:
port: 5921
# 应用名称
spring:
application:
name: spring-boot-rabbitmq-study
rabbitmq:
# 主机地址
host: 127.0.0.1
# 主机端口
port: 5672
# 用户账号
username: admin
# 用户密码
password: 123456
# 虚拟主机
virtual-host: /
listener:
simple:
# ACK确认策略:手动确认
acknowledge-mode: manual
retry:
# 开启重试
enabled: true
# 重试次数
max-attempts: 3
# 重试间隔
initial-interval: 2000ms定义消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72package cn.running.study.service;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.Charset;
/**
* TestConsumer
*
* @author chenghao
* @since 2022-06-02
*/
public class TestConsumer {
/**
* 处理消息:fan_queue01
*
* @param message 消息属性实体
* @param channel 通道
*/
public void handleQueue01(Message message, Channel channel) {
System.out.println("handleQueue01:" + new String(message.getBody()));
try {
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 处理消息:fan_queue02
*
* @param message 消息属性实体
* @param channel 通道
*/
public void handleQueue02(Message message, Channel channel) {
System.out.println("handleQueue02:" + new String(message.getBody()));
try {
// 确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
5. 定义生产者
```java
/**
* 发送消息
*/
public void sendMessage() {
// 转换消息并发送
rabbitTemplate.convertAndSend("fan_exchange", "", "Hello!fan_exchange!!!");
}
死信队列
DLX,称之为死信队列交换机;当一个消息在队列中变成死信之后,会被重新发送到另一个交换机中
产生死信消息的原因:
- 消息过期
- 消息被拒绝
- 消息队列已满
使用死信队列需要使用一个队列参数x-dead-letter-exchange
指定交换机