RabbitMQ学习日志

本篇文章主要介绍了RabbitMQ这种消息队列,从消息队列的概念、应用场景、安装方式到它的核心概念、五种工作模式。在安装的时候推荐使用Docker方式进行安装。重点需要理解的就是消息队列的应用场景、核心概念和RabbitMQ的五种工作模式,其中用的比较多的就是发布订阅模式、主题模式。

Message Queue

队列(Queue)是一种常见的数据结构,其最大的特性就是先进先出(Firist In First Out),作为最基础的数据结构,队列应用很广泛,比如我们熟知的Redis基础数据类型List,其底层数据结构就是队列。

消息队列(Messaeg Queue)是一种使用队列(Queue)作为底层存储数据结构,可用于解决不同进程与应用之间通讯的分布式消息容器,也称为消息中间件。

目前使用得比较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ等。本文主要讲述的是RabbitMQ,RabbitMQ是用Erlang语言开发的一个实现了AMQP协议的消息队列服务器,相比其他同类型的消息队列,最大的特点在保证可观的单机吞吐量的同时,延时方面非常出色。

RabbitMQ支持多种客户端,比如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。这里是AMQP官网 https://amqp.org

MQ应用场景

消息队列使用广泛,其应用场景有很多,下面我们列举比较常见的四个场景:

1、消息通讯

消息队列最主要功能收发消息,其内部有高效的通讯机制,因此非常适合用于消息通讯。

我们可以基于消息队列开发点对点聊天系统,也可以开发广播系统,用于将消息广播给大量接收者。

2、异步处理

一般我们写的程序都是顺序执行(同步执行),比如一个用户注册函数,其执行顺序如下:

  • 1、写入用户注册数据。
  • 2、发送注册邮件。
  • 3、发送注册成功的短信通知。
  • 4、更新统计数据。

按照上面的执行顺序,要全部执行完毕,才能返回成功,但其实在第1步执行成功后,其他的步骤完全可以异步执行,我们可以将后面的逻辑发给消息队列,再由其他程序异步执行。使用消息队列进行异步处理,可以更快地返回结果,加快服务器的响应速度,提升了服务器的性能。

3、服务解耦

在我们的系统中,应用与应用之间的通讯是很常见的,一般我们应用之间直接调用,比如说应用A调用应用B的接口,这时候应用之间的关系是强耦合的。

如果应用B处于不可用的状态,那么应用A也会受影响。

在应用A与应用B之间引入消息队列进行服务解耦,如果应用B挂掉,也不会影响应用A的使用。

4、流量削峰

对于高并发的系统来说,在访问高峰时,突发的流量就像洪水般向应用系统涌过来,尤其是一些高并发写操作,随时会导致数据库服务器瘫痪,无法继续提供服务。

而引入消息队列则可以减少突发流量对应用系统的冲击。消息队列就像水库一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。而在旱季水流量小的时候又可以把水放出来灌溉庄稼。

这方面最常见的例子就是秒杀系统,一般秒杀活动瞬间流量很高,如果流量全部涌向秒杀系统,会压垮秒杀系统,通过引入消息队列,可以有效缓冲突发流量,达到削峰填谷的作用。

RabbitMQ安装

Docker安装方式如下:

1docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

RabbitMQ核心概念

RabbitMQ有属于自己的一套核心概念,对这些概念的理解很重要,只有理解了这些核心概念,才有可能建立对RabbitMQ的全面理解:

Broker

Broker概念比较简单,我们可以把Broker理解为一个RabitMQ Server。

Producer与Consumer

生产者与消费者相对于RabbitMQ服务器来说,都是RabbitMQ服务器的客户端。

  • 生产者(Producer):连到RabbitMQ服务器,将消息发送到RabbitMQ服务器的队列,是消息的发送方。
  • 消费者(Consumer):连接到RabbitMQ则是为了消费队列中的消息,是消息的接收方。

生产者与消费者一般由我们的应用程序充当。

Connection

Connection是RabbitMQ内部对象之一,用于管理每个到RabbitMQ的TCP网络连接。

Channel

Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

Exchnage

消息交换机,作用是接收来自生产者的消息,并根据路由键转发消息到所绑定的队列。

生产者发送上的消息,就是先通过Exchnage按照绑定(binding)规则转发到队列的。

交换机类型(Exchange Type)有四种:fanout、direct、topic,headers,其中headers并不常用。

  • fanout:这种类型不处理路由键(RoutingKey),很像子网广播,每台子网内的主机都获得了一份复制的消息,发布/订阅模式就是指使用fanout交换机类型,fanout类型交换机转发消息是最快的。
  • direct:模式处理路由键,需要路由键完全匹配的队列才能收到消息,路由模式使用的是direct类型的交换机。
  • topic:将路由键和某模式进行匹配。主题模式使用的是topic类型的交换机。

路由模式,发布订阅模式,主题模式,这些工作模式我们下面会讲。

mark

Queue

Queue即队列,RabbitMQ内部用于存储消息的对象,是真正用存储消息的结构,在生产端,生产者的消息最终发送到指定队列,而消费者也是通过订阅某个队列,达到获取消息的目的。

Binding

Binding是一种操作,其作用是建立消息从Exchange转发到Queue的规则,在进行Exchange与Queue的绑定时,需要指定一个BindingKey,Binding操作一般用于RabbitMQ的路由工作模式和主题工作模式。

BindingKey的概念,下面在讲RabbitMQ的工作模式会详细讲解。

Virtual Host

Virutal host也叫虚拟主机,一个VirtualHost下面有一组不同Exchnage与Queue,不同的Virtual host的Exchnage与Queue之间互相不影响。应用隔离与权限划分,Virtual host是RabbitMQ中最小颗粒的权限单位划分。

如果要类比的话,我们可以把Virtual host比作MySQL中的数据库,通常我们在使用MySQL时,会为不同的项目指定不同的数据库,同样的,在使用RabbitMQ时,我们可以为不同的应用程序指定不同的Virtual host。

mark

RabbitMQ几种模式

请参考:https://www.rabbitmq.com/getstarted.html

1、简单(simple)模式

simple模式,是RabbitMQ几种模式中最简单的一种模式,其结构如下图所示:

mark

从上面的示意图,我们可以看出simple模式有以下几个特征:

  • 只有一个生产者、一个消费者和一个队列。
  • 生产者和消费者在发送和接收消息时,只需要指定队列名,而不需要指定发送到哪个Exchange,RabbitMQ服务器会自动使用Virtual host的默认的Exchange,默认Exchange的type为direct。
 1@Slf4j
 2@Component
 3public class SimpleCustomer {
 4    @RabbitListener(queuesToDeclare = @Queue("simpleQueue"))
 5    public void process(String message){
 6        log.info("SimpleCustomer Message: {}", message);
 7    }
 8}
 9
10
11@Slf4j
12@Component
13public class SimpleProducer extends MqStudyApplicationTests{
14    @Autowired
15    private AmqpTemplate amqpTemplate;
16    @Test
17    public void send(){
18        amqpTemplate.convertAndSend("simpleQueue", "This is a simpleQueue's massage");
19    }
20}

mark

2、工作(work)模式

在simple模式下只有一个生产者和消费者,当生产者生产消息的速度大于消费者的消费速度时,我们可以添加一个或多个消费者来加快消费速度,这种在simple模式下增加消费者的模式,称为work模式,如下图所示:

mark

work模式有以下两个特征:

  • 可以有多个消费者,但一条消息只能被一个消费者获取。
  • 发送到队列中的消息,由服务器平均分配给不同消费者进行消费
 1@Slf4j
 2@Component
 3public class WorkCustomerA {
 4    @RabbitListener(queuesToDeclare = @Queue("workQueue"))
 5    public void process(String message){
 6        log.info("WorkCustomerA Message: {}", message);
 7    }
 8}
 9
10@Slf4j
11@Component
12public class WorkCustomerB {
13    @RabbitListener(queuesToDeclare = @Queue("workQueue"))
14    public void process(String message){
15        log.info("WorkCustomerB Message: {}", message);
16    }
17}
18
19
20@Component
21public class WorkProducer extends MqStudyApplicationTests {
22    @Autowired
23    private AmqpTemplate amqpTemplate;
24    @Test
25    public void send(){
26        while (true){
27            amqpTemplate.convertAndSend("workQueue", "This is a workQueue's massage");
28            try {
29                TimeUnit.SECONDS.sleep(1);
30            } catch (InterruptedException e) {
31                e.printStackTrace();
32            }
33        }
34    }
35}

mark

3、发布/订阅(pub/sub)模式

work模式可以将消息转到多个消费者,但每条消息只能由一个消费者获取,如果我们想一条消息可以同时给多个消费者消费呢?

这时候就需要发布/订阅模式,其示意图如下所示:

mark

从上面的示意图我们可以看出来,在发布/订阅模式下,需要指定发送到哪个Exchange中,上面图中的X表示Exchange。

  • 发布/订阅模式中,Echange的type为fanout。
  • 生产者发送消息时,不需要指定具体的队列名,Exchange会将收到的消息转发到所绑定的队列。
  • 消息被Exchange转到多个队列,一条消息可以被多个消费者获取。

在上图中,oneQueue中的消息要么被CustomerA获取,要么被CustomerB获取。也就是同一条消息,要么是CustomerA + CustomerC消费、要么是CustomerB + CustomerC 消费。

生产者:

 1@Component
 2public class PubSubProducer extends MqStudyProducerApplicationTests {
 3    @Autowired
 4    private AmqpTemplate amqpTemplate;
 5
 6    @Test
 7    public void send(){
 8        for (int i = 0; i <= 20; i++) {
 9            if(i % 2 == 0){
10                amqpTemplate.convertAndSend("AExchange", "", "hello" + i);
11            }else{
12                amqpTemplate.convertAndSend("BExchange", "", "hello" + i);
13            }
14        }
15    }
16}

不同的Exchange之间互不影响,相同Exchange,相同队列的情况下,消息均等消费:

 1@Slf4j
 2@Component
 3public class PubSubCustomerOne {
 4    //AExchange
 5    @RabbitListener(bindings = @QueueBinding(
 6            value = @Queue("pubSubQueueOne"),
 7            exchange = @Exchange(value = "AExchange", type = ExchangeTypes.FANOUT)
 8    ))
 9    public void process1(String message){
10        log.info("PubSubCustomer process1 Message: {}", message);
11    }
12
13
14    //BExchange
15    @RabbitListener(bindings = @QueueBinding(
16            value = @Queue("pubSubQueueTwo"),
17            exchange = @Exchange(value = "BExchange", type = ExchangeTypes.FANOUT)
18    ))
19    public void process2(String message){
20        log.info("PubSubCustomer process2 Message: {}", message);
21    }
22
23    //BExchange
24    @RabbitListener(bindings = @QueueBinding(
25            value = @Queue("pubSubQueueTwo"),
26            exchange = @Exchange(value = "BExchange", type = ExchangeTypes.FANOUT)
27    ))
28    public void process3(String message){
29        log.info("PubSubCustomer process3 Message: {}", message);
30    }
31}

相同Exchange,不同队列的情况下,一条消息可以被多个消费者获取。

4、路由(routing)模式

前面几种模式,消息的目标队列无法由生产者指定,而在路由模式下,消息的目标队列,可以由生产者指定,其示意图如下所示:

mark

  • 路由模式下Exchange的type为direct
  • 消息的目标队列可以由生产者按照routingKey规则指定。
  • 消费者通过BindingKey绑定自己所关心的队列。
  • 一条消息队可以被多个消息者获取。
  • 只有RoutingKeyBidingKey相匹配的队列才会收到消息。

RoutingKey用于生产者指定Exchange最终将消息路由到哪个队列,BindingKey用于消费者绑定到某个队列。

 1// 消费者
 2@Slf4j
 3@Component
 4public class RoutingCustomer {
 5    //AExchange
 6    @RabbitListener(bindings = @QueueBinding(
 7            value = @Queue("routingQueueOne"),
 8            key = "routingQueueOne",
 9            exchange = @Exchange(value = "AExchange", type = ExchangeTypes.DIRECT)
10    ))
11    public void process1(String message){
12        log.info("Customer process1 Message: {}", message);
13    }
14
15    @RabbitListener(bindings = @QueueBinding(
16            value = @Queue("routingQueueTwo"),
17            key = "routingQueueTwo1",
18            exchange = @Exchange(value = "AExchange", type = ExchangeTypes.DIRECT)
19    ))
20    public void process2(String message){
21        log.info("Customer process2 Message: {}", message);
22    }
23
24    @RabbitListener(bindings = @QueueBinding(
25            value = @Queue("routingQueueTwo"),
26            key = "routingQueueTwo2",
27            exchange = @Exchange(value = "AExchange", type = ExchangeTypes.DIRECT)
28    ))
29    public void process3(String message){
30        log.info("Customer process3 Message: {}", message);
31    }
32}
33
34// 生产者
35@Component
36public class PubSubProducer {
37    @Autowired
38    private AmqpTemplate amqpTemplate;
39
40    @Test
41    public void send(){
42        for (int i = 0; i <= 20; i++) {
43            if(i % 2 == 0){
44                amqpTemplate.convertAndSend("AExchange", "routingQueueTwo1", "hello" + i);
45            }else{
46                amqpTemplate.convertAndSend("AExchange", "routingQueueTwo2", "hello" + i);
47            }
48            amqpTemplate.convertAndSend("AExchange", "routingQueueOne", "hello" + i);
49        }
50    }
51}

5、主题(Topic)模式

主题模式是在路由模式的基础上,将路由键和某模式进行匹配。其中#表示匹配多个词,*表示匹配一个词,消费者可以通过某种模式的BindKey来达到订阅某个主题消息的目的,如示意图如下所示:

mark

  • 主题模式Exchange的type取值为topic。
  • 一条消息可以被多个消费者获取。