RabbitMQ学习日志
本篇文章主要介绍了RabbitMQ这种消息队列,从消息队列的概念、应用场景、安装方式到它的核心概念、五种工作模式。在安装的时候推荐使用Docker方式进行安装。重点需要理解的就是消息队列的应用场景、核心概念和RabbitMQ的五种工作模式,其中用的比较多的就是发布订阅模式、主题模式。
Message Queue
队列(Queue)是一种常见的数据结构,其最大的特性就是先进先出(Firist In First Out),作为最基础的数据结构,队列应用很广泛,比如我们熟知的Redis基础数据类型List,其底层数据结构就是队列。
消息队列(Messaeg Queue)是一种使用队列(Queue)作为底层存储数据结构,可用于解决不同进程与应用之间通讯的分布式消息容器,也称为消息中间件。
目前使用得比较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ等。本文主要讲述的是RabbitMQ,RabbitMQ是用Erlang语言开发的一个实现了AMQP协议的消息队列服务器,相比其他同类型的消息队列,最大的特点在保证可观的单机吞吐量的同时,延时方面非常出色。
RabbitMQ支持多种客户端,比如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。这里是AMQP官网 https://amqp.org
MQ应用场景
消息队列使用广泛,其应用场景有很多,下面我们列举比较常见的四个场景:
1、消息通讯
消息队列最主要功能收发消息,其内部有高效的通讯机制,因此非常适合用于消息通讯。
我们可以基于消息队列开发点对点聊天系统,也可以开发广播系统,用于将消息广播给大量接收者。
2、异步处理
一般我们写的程序都是顺序执行(同步执行),比如一个用户注册函数,其执行顺序如下:
- 1、写入用户注册数据。
- 2、发送注册邮件。
- 3、发送注册成功的短信通知。
- 4、更新统计数据。
按照上面的执行顺序,要全部执行完毕,才能返回成功,但其实在第1步执行成功后,其他的步骤完全可以异步执行,我们可以将后面的逻辑发给消息队列,再由其他程序异步执行。使用消息队列进行异步处理,可以更快地返回结果,加快服务器的响应速度,提升了服务器的性能。
3、服务解耦
在我们的系统中,应用与应用之间的通讯是很常见的,一般我们应用之间直接调用,比如说应用A调用应用B的接口,这时候应用之间的关系是强耦合的。
如果应用B处于不可用的状态,那么应用A也会受影响。
在应用A与应用B之间引入消息队列进行服务解耦,如果应用B挂掉,也不会影响应用A的使用。
4、流量削峰
对于高并发的系统来说,在访问高峰时,突发的流量就像洪水般向应用系统涌过来,尤其是一些高并发写操作,随时会导致数据库服务器瘫痪,无法继续提供服务。
而引入消息队列则可以减少突发流量对应用系统的冲击。消息队列就像水库一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。而在旱季水流量小的时候又可以把水放出来灌溉庄稼。
这方面最常见的例子就是秒杀系统,一般秒杀活动瞬间流量很高,如果流量全部涌向秒杀系统,会压垮秒杀系统,通过引入消息队列,可以有效缓冲突发流量,达到削峰填谷的作用。
RabbitMQ安装
Docker安装方式如下:
1docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
RabbitMQ核心概念
RabbitMQ有属于自己的一套核心概念,对这些概念的理解很重要,只有理解了这些核心概念,才有可能建立对RabbitMQ的全面理解:
Broker
Broker概念比较简单,我们可以把Broker理解为一个RabitMQ Server。
Producer与Consumer
生产者与消费者相对于RabbitMQ服务器来说,都是RabbitMQ服务器的客户端。
- 生产者(Producer):连到RabbitMQ服务器,将消息发送到RabbitMQ服务器的队列,是消息的发送方。
- 消费者(Consumer):连接到RabbitMQ则是为了消费队列中的消息,是消息的接收方。
生产者与消费者一般由我们的应用程序充当。
Connection
Connection是RabbitMQ内部对象之一,用于管理每个到RabbitMQ的TCP网络连接。
Channel
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Exchnage
消息交换机,作用是接收来自生产者的消息,并根据路由键转发消息到所绑定的队列。
生产者发送上的消息,就是先通过Exchnage按照绑定(binding)规则转发到队列的。
交换机类型(Exchange Type)有四种:fanout、direct、topic,headers,其中headers并不常用。
- fanout:这种类型不处理路由键(RoutingKey),很像子网广播,每台子网内的主机都获得了一份复制的消息,发布/订阅模式就是指使用fanout交换机类型,fanout类型交换机转发消息是最快的。
- direct:模式处理路由键,需要路由键完全匹配的队列才能收到消息,路由模式使用的是direct类型的交换机。
- topic:将路由键和某模式进行匹配。主题模式使用的是topic类型的交换机。
路由模式,发布订阅模式,主题模式,这些工作模式我们下面会讲。
Queue
Queue即队列,RabbitMQ内部用于存储消息的对象,是真正用存储消息的结构,在生产端,生产者的消息最终发送到指定队列,而消费者也是通过订阅某个队列,达到获取消息的目的。
Binding
Binding是一种操作,其作用是建立消息从Exchange转发到Queue的规则,在进行Exchange与Queue的绑定时,需要指定一个BindingKey,Binding操作一般用于RabbitMQ的路由工作模式和主题工作模式。
BindingKey的概念,下面在讲RabbitMQ的工作模式会详细讲解。
Virtual Host
Virutal host也叫虚拟主机,一个VirtualHost下面有一组不同Exchnage与Queue,不同的Virtual host的Exchnage与Queue之间互相不影响。应用隔离与权限划分,Virtual host是RabbitMQ中最小颗粒的权限单位划分。
如果要类比的话,我们可以把Virtual host比作MySQL中的数据库,通常我们在使用MySQL时,会为不同的项目指定不同的数据库,同样的,在使用RabbitMQ时,我们可以为不同的应用程序指定不同的Virtual host。
RabbitMQ几种模式
请参考:https://www.rabbitmq.com/getstarted.html
1、简单(simple)模式
simple模式,是RabbitMQ几种模式中最简单的一种模式,其结构如下图所示:
从上面的示意图,我们可以看出simple
模式有以下几个特征:
- 只有一个生产者、一个消费者和一个队列。
- 生产者和消费者在发送和接收消息时,只需要指定队列名,而不需要指定发送到哪个Exchange,RabbitMQ服务器会自动使用Virtual host的默认的Exchange,默认Exchange的type为direct。
1@Slf4j
2@Component
3public class SimpleCustomer {
4 @RabbitListener(queuesToDeclare = @Queue("simpleQueue"))
5 public void process(String message){
6 log.info("SimpleCustomer Message: {}", message);
7 }
8}
9
10
11@Slf4j
12@Component
13public class SimpleProducer extends MqStudyApplicationTests{
14 @Autowired
15 private AmqpTemplate amqpTemplate;
16 @Test
17 public void send(){
18 amqpTemplate.convertAndSend("simpleQueue", "This is a simpleQueue's massage");
19 }
20}
2、工作(work)模式
在simple模式下只有一个生产者和消费者,当生产者生产消息的速度大于消费者的消费速度时,我们可以添加一个或多个消费者来加快消费速度,这种在simple模式下增加消费者的模式,称为work模式,如下图所示:
work模式有以下两个特征:
- 可以有多个消费者,但一条消息只能被一个消费者获取。
- 发送到队列中的消息,由服务器平均分配给不同消费者进行消费
1@Slf4j
2@Component
3public class WorkCustomerA {
4 @RabbitListener(queuesToDeclare = @Queue("workQueue"))
5 public void process(String message){
6 log.info("WorkCustomerA Message: {}", message);
7 }
8}
9
10@Slf4j
11@Component
12public class WorkCustomerB {
13 @RabbitListener(queuesToDeclare = @Queue("workQueue"))
14 public void process(String message){
15 log.info("WorkCustomerB Message: {}", message);
16 }
17}
18
19
20@Component
21public class WorkProducer extends MqStudyApplicationTests {
22 @Autowired
23 private AmqpTemplate amqpTemplate;
24 @Test
25 public void send(){
26 while (true){
27 amqpTemplate.convertAndSend("workQueue", "This is a workQueue's massage");
28 try {
29 TimeUnit.SECONDS.sleep(1);
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 }
33 }
34 }
35}
3、发布/订阅(pub/sub)模式
work模式可以将消息转到多个消费者,但每条消息只能由一个消费者获取,如果我们想一条消息可以同时给多个消费者消费呢?
这时候就需要发布/订阅模式,其示意图如下所示:
从上面的示意图我们可以看出来,在发布/订阅模式下,需要指定发送到哪个Exchange中,上面图中的X表示Exchange。
- 发布/订阅模式中,Echange的type为fanout。
- 生产者发送消息时,不需要指定具体的队列名,Exchange会将收到的消息转发到所绑定的队列。
- 消息被Exchange转到多个队列,一条消息可以被多个消费者获取。
在上图中,oneQueue中的消息要么被CustomerA获取,要么被CustomerB获取。也就是同一条消息,要么是CustomerA + CustomerC消费、要么是CustomerB + CustomerC 消费。
生产者:
1@Component
2public class PubSubProducer extends MqStudyProducerApplicationTests {
3 @Autowired
4 private AmqpTemplate amqpTemplate;
5
6 @Test
7 public void send(){
8 for (int i = 0; i <= 20; i++) {
9 if(i % 2 == 0){
10 amqpTemplate.convertAndSend("AExchange", "", "hello" + i);
11 }else{
12 amqpTemplate.convertAndSend("BExchange", "", "hello" + i);
13 }
14 }
15 }
16}
不同的Exchange之间互不影响,相同Exchange,相同队列的情况下,消息均等消费:
1@Slf4j
2@Component
3public class PubSubCustomerOne {
4 //AExchange
5 @RabbitListener(bindings = @QueueBinding(
6 value = @Queue("pubSubQueueOne"),
7 exchange = @Exchange(value = "AExchange", type = ExchangeTypes.FANOUT)
8 ))
9 public void process1(String message){
10 log.info("PubSubCustomer process1 Message: {}", message);
11 }
12
13
14 //BExchange
15 @RabbitListener(bindings = @QueueBinding(
16 value = @Queue("pubSubQueueTwo"),
17 exchange = @Exchange(value = "BExchange", type = ExchangeTypes.FANOUT)
18 ))
19 public void process2(String message){
20 log.info("PubSubCustomer process2 Message: {}", message);
21 }
22
23 //BExchange
24 @RabbitListener(bindings = @QueueBinding(
25 value = @Queue("pubSubQueueTwo"),
26 exchange = @Exchange(value = "BExchange", type = ExchangeTypes.FANOUT)
27 ))
28 public void process3(String message){
29 log.info("PubSubCustomer process3 Message: {}", message);
30 }
31}
相同Exchange,不同队列的情况下,一条消息可以被多个消费者获取。
4、路由(routing)模式
前面几种模式,消息的目标队列无法由生产者指定,而在路由模式下,消息的目标队列,可以由生产者指定,其示意图如下所示:
- 路由模式下
Exchange
的type为direct
。 - 消息的目标队列可以由生产者按照
routingKey
规则指定。 - 消费者通过
BindingKey
绑定自己所关心的队列。 - 一条消息队可以被多个消息者获取。
- 只有
RoutingKey
与BidingKey
相匹配的队列才会收到消息。
RoutingKey
用于生产者指定Exchange
最终将消息路由到哪个队列,BindingKey
用于消费者绑定到某个队列。
1// 消费者
2@Slf4j
3@Component
4public class RoutingCustomer {
5 //AExchange
6 @RabbitListener(bindings = @QueueBinding(
7 value = @Queue("routingQueueOne"),
8 key = "routingQueueOne",
9 exchange = @Exchange(value = "AExchange", type = ExchangeTypes.DIRECT)
10 ))
11 public void process1(String message){
12 log.info("Customer process1 Message: {}", message);
13 }
14
15 @RabbitListener(bindings = @QueueBinding(
16 value = @Queue("routingQueueTwo"),
17 key = "routingQueueTwo1",
18 exchange = @Exchange(value = "AExchange", type = ExchangeTypes.DIRECT)
19 ))
20 public void process2(String message){
21 log.info("Customer process2 Message: {}", message);
22 }
23
24 @RabbitListener(bindings = @QueueBinding(
25 value = @Queue("routingQueueTwo"),
26 key = "routingQueueTwo2",
27 exchange = @Exchange(value = "AExchange", type = ExchangeTypes.DIRECT)
28 ))
29 public void process3(String message){
30 log.info("Customer process3 Message: {}", message);
31 }
32}
33
34// 生产者
35@Component
36public class PubSubProducer {
37 @Autowired
38 private AmqpTemplate amqpTemplate;
39
40 @Test
41 public void send(){
42 for (int i = 0; i <= 20; i++) {
43 if(i % 2 == 0){
44 amqpTemplate.convertAndSend("AExchange", "routingQueueTwo1", "hello" + i);
45 }else{
46 amqpTemplate.convertAndSend("AExchange", "routingQueueTwo2", "hello" + i);
47 }
48 amqpTemplate.convertAndSend("AExchange", "routingQueueOne", "hello" + i);
49 }
50 }
51}
5、主题(Topic)模式
主题模式是在路由模式的基础上,将路由键和某模式进行匹配。其中#
表示匹配多个词,*
表示匹配一个词,消费者可以通过某种模式的BindKey来达到订阅某个主题消息的目的,如示意图如下所示:
- 主题模式Exchange的type取值为topic。
- 一条消息可以被多个消费者获取。