0%

RabbitMQ 学习日志

本篇文章主要介绍了 RabbitMQ 这种消息队列,从消息队列的概念、应用场景、安装方式到它的核心概念、五种工作模式。在安装的时候推荐使用 Docker 方式进行安装。重点需要理解的就是消息队列的应用场景、核心概念和 RabbitMQ 的五种工作模式,其中用的比较多的就是发布订阅模式、主题模式。

Message Queue

队列 (Queue) 是一种常见的数据结构,其最大的特性就是先进先出 (Firist In First Out),作为最基础的数据结构,队列应用很广泛,比如我们熟知的 Redis 基础数据类型 List,其底层数据结构就是队列。

消息队列 (Messaeg Queue) 是一种使用队列 (Queue) 作为底层存储数据结构,可用于解决不同进程与应用之间通讯的分布式消息容器,也称为消息中间件。

目前使用得比较多的消息队列有 ActiveMQ,RabbitMQ,Kafka,RocketMQ 等。本文主要讲述的是 RabbitMQ,RabbitMQ 是用 Erlang 语言开发的一个实现了 AMQP 协议的消息队列服务器,相比其他同类型的消息队列,最大的特点在保证可观的单机吞吐量的同时,延时方面非常出色。

RabbitMQ 支持多种客户端,比如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等。

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。这里是 AMQP 官网 https://amqp.org

MQ 应用场景

消息队列使用广泛,其应用场景有很多,下面我们列举比较常见的四个场景:

1、消息通讯

消息队列最主要功能收发消息,其内部有高效的通讯机制,因此非常适合用于消息通讯。

我们可以基于消息队列开发点对点聊天系统,也可以开发广播系统,用于将消息广播给大量接收者。

2、异步处理

一般我们写的程序都是顺序执行 (同步执行),比如一个用户注册函数,其执行顺序如下:

  • 1、写入用户注册数据。
  • 2、发送注册邮件。
  • 3、发送注册成功的短信通知。
  • 4、更新统计数据。

按照上面的执行顺序,要全部执行完毕,才能返回成功,但其实在第 1 步执行成功后,其他的步骤完全可以异步执行,我们可以将后面的逻辑发给消息队列,再由其他程序异步执行。使用消息队列进行异步处理,可以更快地返回结果,加快服务器的响应速度,提升了服务器的性能。

3、服务解耦

在我们的系统中,应用与应用之间的通讯是很常见的,一般我们应用之间直接调用,比如说应用 A 调用应用 B 的接口,这时候应用之间的关系是强耦合的。

如果应用 B 处于不可用的状态,那么应用 A 也会受影响。

在应用 A 与应用 B 之间引入消息队列进行服务解耦,如果应用 B 挂掉,也不会影响应用 A 的使用。

4、流量削峰

对于高并发的系统来说,在访问高峰时,突发的流量就像洪水般向应用系统涌过来,尤其是一些高并发写操作,随时会导致数据库服务器瘫痪,无法继续提供服务。

而引入消息队列则可以减少突发流量对应用系统的冲击。消息队列就像水库一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。而在旱季水流量小的时候又可以把水放出来灌溉庄稼。

这方面最常见的例子就是秒杀系统,一般秒杀活动瞬间流量很高,如果流量全部涌向秒杀系统,会压垮秒杀系统,通过引入消息队列,可以有效缓冲突发流量,达到削峰填谷的作用。

RabbitMQ 安装

Docker 安装方式如下:

1
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

RabbitMQ 核心概念

RabbitMQ 有属于自己的一套核心概念,对这些概念的理解很重要,只有理解了这些核心概念,才有可能建立对 RabbitMQ 的全面理解:

Broker

Broker 概念比较简单,我们可以把 Broker 理解为一个 RabitMQ Server。

Producer 与 Consumer

生产者与消费者相对于 RabbitMQ 服务器来说,都是 RabbitMQ 服务器的客户端。

  • 生产者 (Producer):连到 RabbitMQ 服务器,将消息发送到 RabbitMQ 服务器的队列,是消息的发送方。
  • 消费者 (Consumer):连接到 RabbitMQ 则是为了消费队列中的消息,是消息的接收方。

生产者与消费者一般由我们的应用程序充当。

Connection

Connection 是 RabbitMQ 内部对象之一,用于管理每个到 RabbitMQ 的 TCP 网络连接。

Channel

Channel 是我们与 RabbitMQ 打交道的最重要的一个接口,我们大部分的业务操作是在 Channel 这个接口中完成的,包括定义 Queue、定义 Exchange、绑定 Queue 与 Exchange、发布消息等。

Exchnage

消息交换机,作用是接收来自生产者的消息,并根据路由键转发消息到所绑定的队列。

生产者发送上的消息,就是先通过 Exchnage 按照绑定 (binding) 规则转发到队列的。

交换机类型 (Exchange Type) 有四种:fanout、direct、topic,headers,其中 headers 并不常用。

  • fanout:这种类型不处理路由键 (RoutingKey),很像子网广播,每台子网内的主机都获得了一份复制的消息,发布 / 订阅模式就是指使用 fanout 交换机类型,fanout 类型交换机转发消息是最快的。
  • direct:模式处理路由键,需要路由键完全匹配的队列才能收到消息,路由模式使用的是 direct 类型的交换机。
  • topic:将路由键和某模式进行匹配。主题模式使用的是 topic 类型的交换机。

路由模式,发布订阅模式,主题模式,这些工作模式我们下面会讲。

mark

Queue

Queue 即队列,RabbitMQ 内部用于存储消息的对象,是真正用存储消息的结构,在生产端,生产者的消息最终发送到指定队列,而消费者也是通过订阅某个队列,达到获取消息的目的。

Binding

Binding 是一种操作,其作用是建立消息从 Exchange 转发到 Queue 的规则,在进行 Exchange 与 Queue 的绑定时,需要指定一个 BindingKey,Binding 操作一般用于 RabbitMQ 的路由工作模式和主题工作模式。

BindingKey 的概念,下面在讲 RabbitMQ 的工作模式会详细讲解。

Virtual Host

Virutal host 也叫虚拟主机,一个 VirtualHost 下面有一组不同 Exchnage 与 Queue,不同的 Virtual host 的 Exchnage 与 Queue 之间互相不影响。应用隔离与权限划分,Virtual host 是 RabbitMQ 中最小颗粒的权限单位划分。

如果要类比的话,我们可以把 Virtual host 比作 MySQL 中的数据库,通常我们在使用 MySQL 时,会为不同的项目指定不同的数据库,同样的,在使用 RabbitMQ 时,我们可以为不同的应用程序指定不同的 Virtual host。

mark

RabbitMQ 几种模式

请参考:https://www.rabbitmq.com/getstarted.html

1、简单 (simple) 模式

simple 模式,是 RabbitMQ 几种模式中最简单的一种模式,其结构如下图所示:

mark

从上面的示意图,我们可以看出 simple 模式有以下几个特征:

  • 只有一个生产者、一个消费者和一个队列。
  • 生产者和消费者在发送和接收消息时,只需要指定队列名,而不需要指定发送到哪个 Exchange,RabbitMQ 服务器会自动使用 Virtual host 的默认的 Exchange,默认 Exchange 的 type 为 direct。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@Component
public class SimpleCustomer {
@RabbitListener (queuesToDeclare = @Queue ("simpleQueue"))
public void process(String message){
log.info ("SimpleCustomer Message: {}", message);
}
}


@Slf4j
@Component
public class SimpleProducer extends MqStudyApplicationTests{
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void send(){
amqpTemplate.convertAndSend ("simpleQueue", "This is a simpleQueue's massage");
}
}

mark

2、工作 (work) 模式

在 simple 模式下只有一个生产者和消费者,当生产者生产消息的速度大于消费者的消费速度时,我们可以添加一个或多个消费者来加快消费速度,这种在 simple 模式下增加消费者的模式,称为 work 模式,如下图所示:

mark

work 模式有以下两个特征:

  • 可以有多个消费者,但一条消息只能被一个消费者获取。
  • 发送到队列中的消息,由服务器平均分配给不同消费者进行消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@Component
public class WorkCustomerA {
@RabbitListener (queuesToDeclare = @Queue ("workQueue"))
public void process(String message){
log.info ("WorkCustomerA Message: {}", message);
}
}

@Slf4j
@Component
public class WorkCustomerB {
@RabbitListener (queuesToDeclare = @Queue ("workQueue"))
public void process(String message){
log.info ("WorkCustomerB Message: {}", message);
}
}


@Component
public class WorkProducer extends MqStudyApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void send(){
while (true){
amqpTemplate.convertAndSend ("workQueue", "This is a workQueue's massage");
try {
TimeUnit.SECONDS.sleep (1);
} catch (InterruptedException e) {
e.printStackTrace ();
}
}
}
}

mark

3、发布 / 订阅 (pub/sub) 模式

work 模式可以将消息转到多个消费者,但每条消息只能由一个消费者获取,如果我们想一条消息可以同时给多个消费者消费呢?

这时候就需要发布 / 订阅模式,其示意图如下所示:

mark

从上面的示意图我们可以看出来,在发布 / 订阅模式下,需要指定发送到哪个 Exchange 中,上面图中的 X 表示 Exchange。

  • 发布 / 订阅模式中,Echange 的 type 为 fanout。
  • 生产者发送消息时,不需要指定具体的队列名,Exchange 会将收到的消息转发到所绑定的队列。
  • 消息被 Exchange 转到多个队列,一条消息可以被多个消费者获取。

在上图中,oneQueue 中的消息要么被 CustomerA 获取,要么被 CustomerB 获取。也就是同一条消息,要么是 CustomerA + CustomerC 消费、要么是 CustomerB + CustomerC 消费。

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class PubSubProducer extends MqStudyProducerApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;

@Test
public void send(){
for (int i = 0; i <= 20; i++) {
if(i % 2 == 0){
amqpTemplate.convertAndSend ("AExchange", "", "hello" + i);
}else{
amqpTemplate.convertAndSend ("BExchange", "", "hello" + i);
}
}
}
}

不同的 Exchange 之间互不影响,相同 Exchange,相同队列的情况下,消息均等消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@Component
public class PubSubCustomerOne {
//AExchange
@RabbitListener (bindings = @QueueBinding (
value = @Queue ("pubSubQueueOne"),
exchange = @Exchange (value = "AExchange", type = ExchangeTypes.FANOUT)
))
public void process1(String message){
log.info ("PubSubCustomer process1 Message: {}", message);
}


//BExchange
@RabbitListener (bindings = @QueueBinding (
value = @Queue ("pubSubQueueTwo"),
exchange = @Exchange (value = "BExchange", type = ExchangeTypes.FANOUT)
))
public void process2(String message){
log.info ("PubSubCustomer process2 Message: {}", message);
}

//BExchange
@RabbitListener (bindings = @QueueBinding (
value = @Queue ("pubSubQueueTwo"),
exchange = @Exchange (value = "BExchange", type = ExchangeTypes.FANOUT)
))
public void process3(String message){
log.info ("PubSubCustomer process3 Message: {}", message);
}
}

相同 Exchange,不同队列的情况下,一条消息可以被多个消费者获取。

4、路由 (routing) 模式

前面几种模式,消息的目标队列无法由生产者指定,而在路由模式下,消息的目标队列,可以由生产者指定,其示意图如下所示:

mark

  • 路由模式下 Exchange 的 type 为 direct
  • 消息的目标队列可以由生产者按照 routingKey 规则指定。
  • 消费者通过 BindingKey 绑定自己所关心的队列。
  • 一条消息队可以被多个消息者获取。
  • 只有 RoutingKeyBidingKey 相匹配的队列才会收到消息。

RoutingKey 用于生产者指定 Exchange 最终将消息路由到哪个队列,BindingKey 用于消费者绑定到某个队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 消费者 
@Slf4j
@Component
public class RoutingCustomer {
//AExchange
@RabbitListener (bindings = @QueueBinding (
value = @Queue ("routingQueueOne"),
key = "routingQueueOne",
exchange = @Exchange (value = "AExchange", type = ExchangeTypes.DIRECT)
))
public void process1(String message){
log.info ("Customer process1 Message: {}", message);
}

@RabbitListener (bindings = @QueueBinding (
value = @Queue ("routingQueueTwo"),
key = "routingQueueTwo1",
exchange = @Exchange (value = "AExchange", type = ExchangeTypes.DIRECT)
))
public void process2(String message){
log.info ("Customer process2 Message: {}", message);
}

@RabbitListener (bindings = @QueueBinding (
value = @Queue ("routingQueueTwo"),
key = "routingQueueTwo2",
exchange = @Exchange (value = "AExchange", type = ExchangeTypes.DIRECT)
))
public void process3(String message){
log.info ("Customer process3 Message: {}", message);
}
}

// 生产者
@Component
public class PubSubProducer {
@Autowired
private AmqpTemplate amqpTemplate;

@Test
public void send(){
for (int i = 0; i <= 20; i++) {
if(i % 2 == 0){
amqpTemplate.convertAndSend ("AExchange", "routingQueueTwo1", "hello" + i);
}else{
amqpTemplate.convertAndSend ("AExchange", "routingQueueTwo2", "hello" + i);
}
amqpTemplate.convertAndSend ("AExchange", "routingQueueOne", "hello" + i);
}
}
}

5、主题 (Topic) 模式

主题模式是在路由模式的基础上,将路由键和某模式进行匹配。其中 # 表示匹配多个词,* 表示匹配一个词,消费者可以通过某种模式的 BindKey 来达到订阅某个主题消息的目的,如示意图如下所示:

mark

  • 主题模式 Exchange 的 type 取值为 topic。
  • 一条消息可以被多个消费者获取。

欢迎关注我的其它发布渠道