0%

RabbitMQ笔记(狂神)

狂神RabbitMQ教学视频学习笔记,包括消息队列介绍、RabbitMQ安装、RabbitMQ的5中订阅和发布的模式Demo等内容

[1] 主体是狂神说系列的RabbitMQ教程https://www.bilibili.com/video/BV1dX4y1V73G?p=1

[2] 参考了cyc2018关于消息中间件的文章http://www.cyc2018.xyz

[3] 参考了JavaGuide主页的链接文章https://snailclimb.gitee.io/javaguide/#/

1 消息队列和RabbitMQ

1.1 消息队列介绍

消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用。通俗来讲,消息队列用来处理分布式应用各个系统之间的通信问题。(举几个例子?搞项目之后回来再看)

消息模型一般分两种:点对点 和 发布/订阅

  • 点对点

消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。

img

  • 发布/订阅

消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。(发布/订阅和设计模式中的观察者模式不同,是异步的并且通过频道通信)

img

1.2 消息队列使用场景【重要】

消息队列带来的优点就是 异步、削峰和解耦,这三点同样也是它的应用场景

异步处理

发送者将消息发送给消息队列之后,不需要同步等待消息接收者处理完毕,而是立即返回进行其它操作。消息接收者从消息队列中订阅消息之后异步处理。

举个🌰,比如我们有一个购票系统,需求是用户在购买完之后能接收到购买完成的短信和邮件。

省略中间的网络通信时间消耗,假设购票系统处理需要 150ms ,短信系统处理需要 200ms,邮件系统处理需要 200ms

  • 如果采用同步的处理:需要等待短信、邮件都处理啊完毕后,才表示购票成功
    • 所需时间150ms + 200ms + 200ms = 550ms

img

  • 如果采用异步的处理,在购票系统和短信、邮件系统中增加一个中间件——消息队列。我们将消息存入消息队列之后,就代表购票成功、
    • 150ms + 10ms = 160ms。

img

流量削锋

在高并发的场景下,如果短时间有大量的请求到达会压垮服务器。

可以将请求发送到消息队列中,服务器按照其处理能力从消息队列中订阅消息进行处理。

还是购票系统的栗子:

主业务购票系统的流量承受能力较强、但短信和邮件系统一般配备的硬件资源不会太高,无法承受和购票系统一样的流量压力。

使用消息队列可以将购买完成的信息发送到消息队列中,短信、邮件系统 尽自己所能地去消息队列中取消息和消费消息

img

应用解耦

如果模块之间不直接进行调用,模块之间耦合度就会很低,那么修改一个模块或者新增一个模块对其它模块的影响会很小,从而实现可扩展性。

通过使用消息队列,一个模块只需要向消息队列中发送消息,其它模块可以选择性地从消息队列中订阅消息从而完成调用。

1.3 RabbitMQ介绍

ActiveMQ、 RabbitMQ、 Kafka、RocketMQ都是不同的消息中间件

RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,用于在分布式系统中存储转发消息。

RabbitMQ的优点

  • 可靠性: RabbitMQ使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
  • 灵活的路由:消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。
  • 扩展性: 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  • 高可用性: 在同一个集群里,队列可以被镜像到多个机器中,以确保当其中某些硬件出现故障后,你的消息仍然安全。

RabbitMQ 的整体架构

图1-RabbitMQ 的整体模型架构

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

  • Producer(生产者) :生产消息的一方(邮件投递者)

  • Consumer(消费者) :消费消息的一方(邮件收件人)

  • Exchange(交换器) :用来接收生产者发送的消息并将这些消息路由给服务器中的队列中。RabbitMQ 有四种交换器类型。

  • Queue(消息队列) :用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。

  • RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者RabbitMQ服务实例。

2 RabbitMQ安装

RabbitMQ官网查看版本支持:https://www.rabbitmq.com/which-erlang.html

erlang 官网:https://www.erlang.org (不要在这里下载)

下载rabbitmq-server-3.8.19-1.el7.noarch.rpm:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.19

下载erlang-solutions-2.0-1.noarch.rpm https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm

环境准备:CentOS7.x 安装路径 /usr/rabbitmq

2.1 安装Erlang和RabbitMQ

Erlang

1
2
3
4
5
6
7
8
9
mkdir -p /usr/rabbitmq #再安装路径下创建文件夹
cd /usr/rabbitmq #进入安装文件夹
#使用 XFTP将两个安装包拷贝进rabbitmq文件夹下
ls
# erlang-solutions-2.0-1.noarch.rpm
# rabbitmq-server-3.8.19-1.el7.noarch.rpm
rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v

RabbitMQ

1
2
3
4
5
6
7
8
9
10
yum install -y socat
rpm -Uvh rabbitmq-server-3.8.19-1.el7.noarch.rpm #没用?
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态,如图
systemctl status rabbitmq-server.service
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
image-20210708005845710

2.2 RabbitMQWeb管理界面

1 安装web端插件

2 重启服务,开放阿里云的安全组和防火墙

3 浏览器访问初始界面

4 授权新的账号和密码

  • 新增用户
  • 设置用户的操作权限
  • 为用户添加资源权限

5 网页登录成功

1 默认情况下,是没有安装web端的客户端插件,需要安装才可以生效

1
2
rabbitmq-plugins enable rabbitmq_management
# 说明:rabbitmq有一个默认账号和密码是:`guest`默认情况只能在 localhost本计下访问,所以需要添加一个远程登录的用户

2 安装完毕以后,重启服务即可

1
systemctl restart rabbitmq-server #重启服务
  • 一定要记住,在对应服务器(阿里云)的安全组、防火墙中开放15672端口

3 在浏览器访问 http://47.98.220.123:15672/

4 授权账号和密码

  • 新增用户
1
rabbitmqctl add_user admin admin
  • 设置用户分配操作权限
1
2
3
4
5
6
rabbitmqctl set_user_tags admin administrator
# 用户级别:
# administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
# monitoring:监控者 登录控制台,查看所有信息
# policymaker:策略制定者 登录控制台,指定策略
# managment 普通管理员 登录控制台,只能看自己的信息
  • 为用户添加资源权限
1
rabbitmqctl set_permissions -p / admin ".*"".*"".*"

5 网页登录成功 设置的账号密码均为admin

image-20210708010313311

3 RabbitMQ五种工作模式

生产者、消费者是分别两个项目、通过RabbitMQ这个消息中间件的通道进行连接。RabbitMQ的后台页面上显示队列等中间件里面所有的全部信息。

先搭建项目环境

1 新建一个空项目rabbitmq

2 空项目下新建一个springboot项目rabbitmq-provider,勾选Web 和 RabbitMQ支持

3 配置rabbitmq-provider的配置文件application.yaml

4 导入依赖lombok\fastjson,编写实体类Vehicle表示车辆

5 重复234的操作,新建一个rabbitmq-consumer项目

6 阿里云开启防火墙和安全组的5672和5673端口!!【重要】

1
2
3
4
5
6
7
8
9
10
11
# 服务端口
server:
port: 8080
spring:
#我这里仅写了ip 其余端口账号密码由于是演示 采用默认即可,不必要写
rabbitmq:
host: 47.98.220.123
port: 5672
virtual-host: /
username: admin
password: admin

1、简单队列(Easy)

  img

  一个生产者对应一个消费者!!!

生产者:

1 建easy包 下写EasyRabbitConfig配置文件,给队列命名

2 写EasyProviderServer类,发送JSON格式的信息

3 测试类中调用Server类中的方法,运行之。查看rabbitMQ后台页面

1 建easy包 下写EasyRabbitConfig配置文件

1
2
3
4
5
6
7
@Configuration
public class EasyRabbitConfig {
@Bean
public Queue easyQueue() {
return new Queue("rabbit_easy_queue");
}
}

2 写EasyProviderServer类,业务实现后发送JSON格式的信息

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class EasyProviderServer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendEasyMessage(){
for (int i=0;i<10;i++){
String s = JSON.toJSONString(new Vehicle(i, i + "easy车车"));
rabbitTemplate.convertAndSend("rabbit_easy_queue",s);
}
}
}

3 测试类中调用Server类中的方法

1
2
3
4
5
6
7
8
9
@SpringBootTest
class RabbitmqProviderApplicationTests {
@Autowired
EasyProviderServer easyProviderServer;
@Test
void contextLoads() {
easyProviderServer.sendEasyMessage();
}
}

image-20210708105408993

消费者:

1 建easy包 下写EasyRabbitConfig配置文件,给队列命名

2 写EasyConsumer类,接受message信息,打印车辆信息

3 运行项目,消费队列中的信息

// 如果报错记得去rabbitmq控制页面Admin下设置admin的权限,点一下就好!!

1 建easy包 下写EasyRabbitConfig配置文件,给队列命名

1
2
3
4
5
6
7
@Configuration
public class EasyRabbitConfig {
@Bean
public Queue easyQueue() {
return new Queue("rabbit_easy_queue");
}
}

2 写EasyConsumer类,接受message信息

1
2
3
4
5
6
7
@Component
public class EasyConsumer {
@RabbitListener(queues = "rabbit_easy_queue")
public void process(Message message) {
System.out.println("easy模式:消费者接收到车辆消息: " + JSON.parseObject(new String(message.getBody()), Vehicle.class));
}
}

3 运行项目,发现队列清空了

image-20210708124038722 image-20210708124053092

2、work 模式

img

  一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!!

生产者:和easy的完全一致,这里新建一个work包

1 WorkRabbitConfig配置文件

1
2
3
4
5
6
7
8
@Configuration
public class WorkRabbitConfig {
//work的神生产者和easy一样
@Bean
public Queue workQueue() {
return new Queue("rabbit_work_queue");
}
}

2 写WorkProviderServer类

1
2
3
4
5
6
7
8
9
10
11
@Service
public class WorkProviderServer {
//work模式的生产者和easy的一样
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendWorkMessage() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("rabbit_work_queue", JSON.toJSONString(new Vehicle(i,i+"work车车")));
}
}
}

3 测试类中增加一个workTest()方法,运行之

1
2
3
4
5
6
@Autowired
WorkProviderServer workProviderServer;
@Test
void workTest() {
workProviderServer.sendWorkMessage();
}

image-20210708130550835

消费者:与easy中也很相似,就是WorkConsumer类下多写几个消费方法!!很简单

1 写WorkRabbitConfig配置

1
2
3
4
5
6
7
@Configuration
public class WorkRabbitConfig {
@Bean
public Queue workQueue() {
return new Queue("rabbit_work_queue");
}
}

2 写WorkConsumer类,下面含三个方法即三个消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class WorkConsumer {
@RabbitListener(queues = "rabbit_work_queue")
public void work1(Message message) {
System.out.println("消费者1--work--接收到车辆消息: " + JSON.parseObject(message.getBody(), Vehicle.class));
}
@RabbitListener(queues = "rabbit_work_queue")
public void work2(Message message) {
System.out.println("消费者2--work--接收到车辆消息: " + JSON.parseObject(message.getBody(),Vehicle.class));
}

@RabbitListener(queues = "rabbit_work_queue")
public void work3(Message message) {
System.out.println("消费者3--work--接收到车辆消息: " + JSON.parseObject(message.getBody(),Vehicle.class));
}
}

3 运行项目,发现队列清空了

image-20210708130948085 image-20210708131013920

3、发布/订阅模式(Fanout)

  img

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。

ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。

这里演示一个交换机绑定两个队列,每个队列分别被两个消费者监听的情况!!

生产者:与work模式的不同在于FanoutExchangeConfig 中需要定义交换机并将队列与交换机绑定。这里新建一个fanout包

1 FanoutConfig配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* @desc 发布订阅模式 配置两个队列一个交换机
*/
@Configuration
public class FanoutConfig {
// 队列一
@Bean
public Queue FanoutQueueOne() {
return new Queue("rabbit_fanout_queue_one");
}
//队列二
@Bean
public Queue FanoutQueueTwo() {
return new Queue("rabbit_fanout_queue_two");
}

//交换机 声明为FanoutExchange类型
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_exchange");
}

/**
* 绑定队列一到交换机
* @param FanoutQueueOne 上方定义的队列一方法名 根据此方法名参数 器会自动注入对应bean
* @param fanoutExchange 上方定义的交换机方法名
* @return
*/
@Bean
public Binding bindingFanoutExchangeA(Queue FanoutQueueOne, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(FanoutQueueOne).to(fanoutExchange);
}

/**
* 绑定队列二到交换机
* @param FanoutQueueTwo 上方定义的队列二方法名 根据此方法名参数 器会自动注入对应bean 当
* 然也可以省略参数 直接在bind中指定队列构建方法名 例如 FanoutQueueTwo()
*
* @param fanoutExchange 上方定义的交换机方法名
* @return
*/
@Bean
public Binding bindingFanoutExchangeB(Queue FanoutQueueTwo, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(FanoutQueueTwo).to(fanoutExchange);
}
}

2 FanoutProviderServer类

1
2
3
4
5
6
7
8
9
10
11
@Service
public class FanoutProviderServer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendFanoutMessage() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("fanout_exchange","", JSON.toJSONString(new Vehicle(i,i+"发布订阅车车")));
}
}
}

3 测试方法

1
2
3
4
5
6
@Autowired
FanoutProviderServer fanoutProviderServer;
@Test
void fanoutTest() {
fanoutProviderServer.sendFanoutMessage();
}

消费者:与work的不同在于出现了队列!!每个队列有属于自己的消费者

1 FanoutConfig与生产者相同

2 FanoutConsumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class FanoutConsumer {
@RabbitListener(queues = "rabbit_fanout_queue_one")
public void consumerOne(Message message) {
System.out.println("rabbit_fanout_queue_one队列 消费者1:收到消息---" + JSON.parseObject(new String(message.getBody()), Vehicle.class));
}
@RabbitListener(queues = "rabbit_fanout_queue_one")
public void consumerOne2(Message message) {
System.out.println("rabbit_fanout_queue_one队列 消费者2:收到消息---" + JSON.parseObject(new String(message.getBody()),Vehicle.class));
}
//-------------一个队列绑定两个消费者 --------------------------------
@RabbitListener(queues = "rabbit_fanout_queue_two")
public void consumerTwo(Message message) {
System.out.println("rabbit_fanout_queue_two队列 消费者1:收到消息---" + JSON.parseObject(new String(message.getBody()),Vehicle.class));
}
@RabbitListener(queues = "rabbit_fanout_queue_two")
public void consumerTwo2(Message message) {
System.out.println("rabbit_fanout_queue_two队列 消费者2:收到消息---" + JSON.parseObject(new String(message.getBody()),Vehicle.class));
}
}

3 运行

image-20200715222028261

?交换机中有10辆车,结果是:每个队列中都有10辆车、然后每个消费者只收到5个订阅?(有点不太符合常理、队列的里面也应该只有5辆车才对啊)

4、路由模式(Direct)

  img

生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。

也就是让消费者有选择性的接收消息。

生产者:

消费者:

5、主题模式(Topic)

 img

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。

符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。

生产者:

消费者:

4 RabbitMQ进阶

(下次一定,目前学会整合RabbitMQ就行07/08)

-------------感谢阅读没事常来-------------