概念

队列

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称为空队列。
队列的数据元素又称为队列元素。在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

消息队列(Message Queue)

消息队列可以简单理解为把要传输的数据放在队列中。

消息队列是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:

当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。

消息队列主要解决了应用耦合、异步处理、流量削锋等问题。

当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。

Tips:

  1. 把数据放到消息队列叫做生产者
  2. 从消息队列里边取数据叫做消费者

AMQP和JMS

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

两者的区别和联系
  • JMS是定义了统一接口,对消息操作进行统一;AMQP通过规定协议统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言
  • JMS规定了两种消息模型(queuetopic);而AMQP的模型更丰富

消息队列应用场景

  • 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
  • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
  • 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
  • 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;

举例:消息队列(mq)是什么?

RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是由以高性能、健壮以及可伸缩性出名的 Erlang 语言编写的,而集群和故障转移是构建在开放电信平台框架上的。
所有主要的编程语言均有与代理接口通讯的客户端库。

管理界面

  • Overview: 概览(RabbitMQ的基本信息)
  • Connections: “连接”就是生产者和消费者的连接情况;
    不管生产者还是消费者,其实都是应用程序,都需要和rabbitmq服务器建立连接后才可以完成消息的生产和消费。
  • Channels: “通道”是建立在”连接”基础上的,消息的投递获取依赖通道。实际开发中”连接”应为全局变量,”通道”为线程级;
  • Exchange: 交换机,用来实现消息的路由
  • Queues: 队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。查看队列中此时未被消费的数据条数。
    下方可以查看队列有没有消费者(consumer)
  • 端口:
    • 5672: RabbitMQ的编程语言客户端连接接口
    • 15672: RabbitMQ管理界面连接接口
    • 25672: RabbitMQ集群接口

消息模型

基本消息模型

基本消息模型包含:

  • 消息队列,类似邮箱,可以缓存消息,生产者向其中投递消息,消费者从其中取出消息
  • P:生产者
  • C:消费者

1

消息发送者生产消息发送到队列中,然后消息接收者从队列中取出并且消费消息。消息被消费以后,队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

工作队列(Work Queues)

  • 消息队列
  • P: 生产者
  • C1: 消费者1,假设完成速度较慢
  • C2: 消费者2,假设完成速度较快

2

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享,但是一个消息只会有一个工作者接收。

默认情况下,RabbitMQ将每个消息按顺序发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。(处理任务的时间和接收任务的条数是不影响的,处理慢的服务器其实已经接收到了消息,只是堆积在那里)

为了避免处理过程中的消息丢失,所以引入了消息确认机制,具体见
下文。

发布/订阅模式

工作模式的假设是每一个消息都恰好交付给一个消费者。而发布/订阅模式是将一条消息传达给多个消费者

RabbitMQ消息传递模型的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

生产者将消息发送到交换机。由交换机来决定最终将消息送至哪些队列,具体的推送规则在定义交换机的时候指定。消费者如果想要获取消息,只需要将自己声明的队列绑定到交换机上就可以收到生产者发送到交换机的消息

交换类型有以下几种:

  • direct: 路由
  • topic: 主题
  • headers: 表头
  • fanout: 广播
Fanout

广播模式下,路由会将消息推送到所有绑定的队列

  • P: 生产者
  • X: 交换机
  • C: 消费者

PHzM6Wcb7ABktfJ

流程如下:

  1. 可以有多个消费者
  2. 每个消费者有自己的queue(队列)
  3. 每个队列都要绑定到Exchange(交换机)
  4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  5. 交换机把消息发送给绑定过的所有队列
  6. 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
Direct

在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个路由键(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的路由键。

zy8uUeTcdOLikgX

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个路由键。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与路由键完全匹配的队列
  • C1:消费者,其所在队列指定了需要路由键为 orange 的消息
  • C2:消费者,其所在队列指定了需要路由键为 black,green 的消息

在这样的设置中,通过路由键orange发布到交换机的消息 将被推送到到队列C1。路由键为black或者green的消息将转到C2。所有其他消息将被丢弃。

WgrXGODvV2BCfIj
多个路由可以使用同一个路由键,如上:C1,C2都绑定了black,带有black路由键的消息将推送到C1,C2。

Topic

Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定RoutingKey的时候使用通配符!

通配符规则:

  • #:匹配一个或多个词
  • *:匹配 1 个词
Headers

Headers和主题模式类似,区别在于:主题模式基于路由键,表头交换基于消息的header数据

消息确认机制ACK

默认情况下如果一个 消息 被消费者所正确接收则会被从 队列 中移除

根据定义,使用消息传递代理的系统(例如RabbitMQ)是分布式的。所以不能保证发送的协议方法(消息)可以到达对等方或被其成功处理。

如果消费者没能成功接收到消息或者在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。因此发布者和消费者都需要一种机制来进行传递和处理确认

从使用者到RabbitMQ的交付处理确认在消息传递协议中称为acknowledgements。对发布者的代理确认是一个称为发布者确认的协议扩展。

消息接收确认分为自动确认手动确认两种

自动确认

在自动确认模式下,消息在发送后立即被视为已成功传递。使用此模式需要在更高的吞吐量(只要消费者可以跟上)和降低交付和消费者处理的安全性之间权衡。此模式通常称为“一劳永逸”。
与手动确认模式不同,如果使用者的TCP连接或通道在成功传递之前已经关闭,或者处理消息时发生异常,那么服务器发送的消息将丢失。因此,自动消息确认应该被认为是不安全的。

手动确认

手动确认可以解决自动确认的问题,但是它也意味着所有的消息在消费端获取到后必须有一个结果返回到服务端。如果未对接收到的消息进行确认,那么此消息在队列中的状态为Unacked,直到消费端连接中断后又变为Ready
所有消费者(包括当前消费者)都无法对Unacked状态的消息进行二次消费,所以此类消息越多占用的内存也越多。而如果消息变为Ready状态,那么已经处理过这条消息的消费者可能会再次接收到这条消息并进行处理。

手动发送的确认可以是肯定的也可以是否定的,并且使用以下协议方法之一:

  • basic.ack用于肯定确认
  • basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
  • basic.reject用于否定确认,但与basic.nack相比有一个限制

肯定的确认只是指示RabbitMQ记录已传递的消息,可以将其丢弃。带有basic.reject的否定确认具有相同的效果。区别主要在语义上:肯定的确认假定消息已成功处理,而否定的消息则表示未处理传递但仍应删除。

持久化

条件:

  1. 交换机持久化,声明durable 为 true
  2. 队列持久化,声明durable 为 true
  3. 消息持久化,声明delivery_mode=2

原生代码

交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 创建连接mq的工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置rabbitmq的主机
connectionFactory.setHost("127.0.0.1");
// 设置端口号
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/admin");
// 设置访问虚拟主机的帐户密码
connectionFactory.setUsername("shiming");
connectionFactory.setPassword("xqm?123456");

// 获取连接对象
Connection connection = connectionFactory.newConnection();

// 获取连接的通道
Channel channel = connection.createChannel();

// 声明交换机
// 三个参数分别为 交换器名、交换器类型、是否持久化
// channel.exchangeDeclare("exchange", "topic", true);

// 通道绑定对应消息队列
// 参数1:队列名称,如果队列不存在将自动创建
// 参数2:定义队列是否持久化 true 持久化
// exclusive:是否独占队列 true 独占
// autoDelete:是否在消费完成后自动删除队列 true 自动删除
// 参数5:额外附加参数
channel.queueDeclare("hello",true,false,false,null);


// 发布消息
// 参数1:交换机名称
// 参数2:队列名称
// 参数3:传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化
// 参数4:消息的具体内容
channel.basicPublish("exchange","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());

channel.close();
connection.close();

SpringBoot中使用

添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
# ip地址
spring.rabbitmq.host=127.0.0.1
# 端口号
spring.rabbitmq.port=5672
# 帐户
spring.rabbitmq.username=admin
# 密码
spring.rabbitmq.password=123456
# 虚拟主机名
spring.rabbitmq.virtual-host=/test

阿里云服务器需要将5672端口号添加到安全组

简单模型

生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
public class SendController {
// 使用RabbitTemplate,这提供了接收/发送等等方法
@Autowired RabbitTemplate rabbitTemplate;

@GetMapping("/sendMessage")
public String sendDirectMessage() {
// 生成信息
String messageData = "test message, hello!";
String createTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageData", messageData);
map.put("createTime", createTime);
// 将消息携带绑定键值:Hello 发送到
rabbitTemplate.convertAndSend("Hello", map);
return "ok";
}
}
消费者
1
2
3
4
5
6
7
8
9
10
@Component
@RabbitListener(queues = "Hello")
public class Recevier {

@RabbitHandler
public void process(Map testMessage) {
System.out.println("Receiver消费者收到消息 : " + testMessage.toString());
}

}

启动项目调用接口就能看到信息了

工作模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 生产者发送50条数据
public String sendDirectMessage() {
for (int i = 0; i < 50; i++) {
String messageData = "test message, hello!";
String createTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageData", messageData);
map.put("createTime", createTime);
map.put("order",i);
// 将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("Hello", map);
}
return "ok";
}
}
1
2
3
4
5
6
7
8
9
10
11
// 添加消费者2
@RabbitHandler
public void process(Map testMessage) {
// 模拟速度更慢的消费者
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Receiver2消费者收到消息 : " + testMessage.toString());
}

运行后可以看到虽然处理速度不同,但是处理的消息条数是一样的。

参考