AMQP,全称为:Advanced Message Queuing Protocol,高级消息队列协议,是面向消息中间件的开放标准的二进制应用层协议。AMQP的核心特性是:面向消息、排队、路由(包括点对点和发布订阅)、可靠性和安全性。这些功能使其非常适合在应用程序之间传递业务消息,AMQP还可以用作物联网IoT协议。
目前,AMQP 1.0已经被批准为国际标准,具体规范文档,可以进一步阅读:OASIS Advanced Message Queuing Protocol (AMQP) Version 1.0[1]
而RabbitMQ最初是为了支持AMQP 0-9-1[2]而开发的,因此,该协议是RabbitMQ Broker支持的核心协议。
下面我们就简要介绍下AMQP 0-9-1协议[3]。这部分内容,概念会比较多,稍微有点枯燥,但是可以说RabbitMQ就是按照这个协议去实现的,所以熟悉这个协议很重要。
完整的AMQP文档可以从这里下载:AMQP Working Group 0-9-1[4]
1 AMQP模型概述
1.1 AMQP 0-9-1
AMQP 0-9-1 是一个二进制协议,定义了非常强大的消息传递语义。对于客户端来说,这是一个相对容易实现的协议,因此有大量客户端库可用于许多不同的编程语言和环境。
AMQP 0-9-1通常划分为两层:
功能层(Functional Layer)
:定义了一组命令(按功能做不同的分类),提供给应用程序,用于支撑消息相关的工作;传输层(Transport Layer)
:传输层将这些方法从应用程序传送到服务器并返回,并处理通道多路复用、成帧、内容编码、心跳、数据表示和错误处理。
可以在不改变协议的应用程序相关功能的情况下用任意的传输协议来替换传输层,也可以将传输层用于不同的高级协议。
1.2 AMQP 0-9-1模型简介
如下图,消息Broker(代理)
从消息发布者
(发布消息的应用程序,也称为生产者)接收消息并将它们路由到消费者
(处理消息的应用程序)。
由于AMQP是一个网络协议,因此,生产者
、消费者
、代理
都可以部署在不同的机器上。AMQP模型如下图所示:
消息发布到交换机(exchanges)
(通常将其比作邮局或邮箱),然后使用称为绑定(Bindings)
的规则将消息副本分发到队列(queues)
。然后代理(brokers)
要么将消息传递(deliver)
给订阅队列的消费者(consumers)
,要么消费者主动按需从队列中获取(fetch)
/拉取(pull)
消息。
消息元数据:发布消息的时候,发布者可以指定各种消息元数据(消息属性)
,其中一些元数据可能由代理使用,其余的元数据仅由接收消息的应用程序使用。
消息确认:由于网络是不可靠的,并且应用程序可能无法正确处理消息,因此AMQP 0-9-1模型有一个消息确认的概念:当消息传递给消费者时,消费者会自动或者由开发人员在应用程序中手动指定通知代理Broker,代理只会在收到消息(或消息组)的通知时从队列中完全删除该消息。
死信队列:在某些情况下,例如,当消息无法路由时,消息可能会返回给发布者、或者丢弃掉、或者将其放入所谓的死信队列
(如果代理扩展支持),发布者通过使用某些参数来选择如何处理此类情况。
队列(queues)/交换机(exchanges)和绑定(bindings)统称为AMQP实体。
1.3 AMQP 0-9-1 是一个可编程的协议
AMQP 0-9-1是一种可编程的协议,因为AMQP 0-9-1实体
和路由方案
由应用程序本身定义,而不是代理管理员。因此AMQP制定了一些规定来实现这些协议操作:
- 声明队列和交换机;
- 定义他们之间的绑定;
- 订阅队列等。
这为应用程序开发人员提供了很大的自由,但也要求他们了解潜在的定义冲突。在实践中,定义冲突很少见,通常表示为配置错误。
应用程序声明它们需要的AMQP 0-9-1实体
,定义必要的路由方案
,并在不需要使用它们时进行删除。
2 交换机(Exchanges)和交换机类型
交换机
是发送消息的AMQP 0-9-1实体。交换机收到一条消息,并将其路由到零个或者多个队列
中。咳咳,Java架构杂谈
提醒大家,不要联想到了网络的交换机(Network switch),只是中文名称一样而已。
使用的路由算法
取决于交换机类型
和称为绑定
的规则。以下是AMQP 0-9-1 Broker提供的四种交换机类型:
交换类型 | 默认的预定义名称 |
---|---|
直连交换机(Direct exchange) | 空字符串和amq.direct |
扇形交换机(Fanout exchange) | amq.fanout |
主题交换机(Topic exchange) | amq.topic |
头信息交换机(Headers exchange) | amq.match 和RabbitMQ中的 amq.headers |
除了交换类型之外,交换机还声明了许多属性,关键属性有:
Name
,交换机的名称持久性
,保证交换机在Broker重启后仍然存在,如果没有指定持久,那么交换机在Broker重启后就不存在了,需要重新声明,并非所有场景和用例都要求交换机是持久的;自动删除
,当最后一个队列解除绑定时,交换机被删除;参数
,可选,由插件和特定于代理的功能使用。
2.1 默认交换机
默认交换机是由Broker预先声明的匿名直连交换机。使用默认交换机的时候,每个新建队列都会自动绑定到默认交换机上,绑定的路由键与队列名称相同,默认交换机看起来可以将消息直接传递到队列。
2.2 直连交换机
交换机根据消息路由键(router_key)
将消息传递到队列,消息将会投递到与路由键名称和队列名称相同的队列上。直接交换机是消息单播路由
的理想选择(尽管它们也可以用于多播路由)。
直连交换机如下图所示:
- 一个队列N使用路由键 K 绑定到交换机;
- 当具有路由键 M 的新消息到达直连交换机时,如果 K = M,则交换机将其路由到队列N。
如上图,具有路由键"itzhai.com"的消息达到交换机之后,则会路由到Queue1中。
直连交换机通常用于以循环的方式在多个消费者之间分配任务,也就是说,消息的负载均衡是发生在消费者之间而不是队列之间。
2.3 扇形交换机
扇形交换机将消息路由到绑定到它的所有队列,并且忽略路由键。也就是说,当新消息发布到该交换机时,该消息的副本将投递到所有绑定该交换机的队列。扇形交换机是消息广播
路由的理想选择。
扇形交换机如下图所示:
使用扇形交换机的案例都非常相似:
- 大型多人在线游戏(MMO)可以将其用于排行榜更新或其他全局事件;
- 体育新闻网站可以使用扇形交换机向客户端近乎实时的分发比分信息;
- 分布式系统使用它来广播各种状态和配置更新;
- 群聊可以使用它在参与者之间分发消息(AMQP没有内置presence的概念,因此XMPP可能会是更好的选择)。
2.4 主题交换机
主题交换机根据消息路由键和和用于将队列绑定到交换机的模式匹配字符串之间的匹配将消息路由到一个或者多个队列。
也就是说通过消息的路由键去匹配到绑定到交换机的路由键匹配字符串,如果匹配上了,就进行投递消息。
routing key模糊匹配的通配符如下:
*
:用于匹配一个单词,比如itzhai.com.*
,可以匹配:itzhai.com.a
,itzhai.com.b
#
:用于匹配0个或者多个单词,比如itzhai.com.#
,可以匹配:itzhai.com.a
,itzhai.com.a.b
routing key通过.
分隔字符串。
主题交换机如下图所示:
当生产者发送的routing_key=itzhai.com
的时候,会把消息路由投递到Queue1和Queue2。
当生产者发送的routing_key=www.itzhai.com
的时候,会把消息路由投递到Queue3。
当需要有针对性的选择多个接收消息的消费者或者应用的时候,主题交换机都可以被列入考虑的范围。常见的使用场景:
- 后台任务由多个工作线程完成,每个工作线程负责处理某些特定的任务,这个时候可以通过主题交换机订阅感兴趣的消息;
- 分发特定地理位置的信息,每个工作线程只订阅感兴趣的地理位置的信息;
- …
2.5 头交换机
头交换机不依赖路由键的匹配规则来路由消息,而是根据发送消息内容中的请求头属性进行匹配。
头交换机类似于直连交换机,但是直连交换机的路由键必须是一个字符串,而请求头信息则没有这个约束,它们甚至可以是整数或者字典。因此可以用作路由键不必是字符串的直连交换。
绑定一个队列到头交换机上的时候,会同时绑定多个用于匹配的头信息。
投递消息的时候,可以携带一个x-match
参数,指定是否要求必须所有的头信息都匹配(all)才能进行投递,还是只要匹配任何一个就可以了(any)。
注意:以字符串
x-
打头的头属性,不会作为匹配项。
3 队列(Queues)
AMQP 0-9-1 中的队列与其他消息队列和任务队列系统中的队列类似,用于存储即将被消费的消息。一般地,队列与交换机共享一些属性,但队列也有一些特定的属性:
Name
:队列名称;Durable
:队列持久化,队列在Broker重启之后是否继续存在;Exclusive
:队列是否仅由一个连接使用,如果是,在连接关闭的时候,队列将被删除;Auto-delete
:当最后一个消费者取消订阅的时候,立即删除;Arguments
:可选,一些特定的插件和Broker功能使用,例如实现消息的TTL,队列长度限制等。
关于队列的声明:
在使用队列之前,必须先声明它。声明队列的时候,如果队列尚不存在,则将创建一个队列;如果队列已存在,并且属性与声明中的属性相同,则不用重新创建一个;如果现有队列属性与声明的队列属性不同,将会引发
406(PRECONDITION_FAILED)
的通道级异常。
3.1 队列名称
应用程序可以设置队列名称,如果设置为空字符串,Broker会为它们生成一个唯一的队列名称,在队列声明响应体中一起返回给客户端。队列名称为255个字节以内的UTF-8字符。
以
amq
开头的队列名称,保留给Broker内部使用,如果尝试使用此类名称声明一个队列将导致通道级别异常:403(ACCESS_REFUSED)
。
3.2 队列持久化
持久化的队列的元数据会存储在磁盘上,当Broker重启之后,队列依然存在。没有被持久化的队列称为暂存队列。发布的消息也有同样的区分,也就是说,持久化的队列并不会使得路由到它的消息也具有持久性,需要手动把消息也标记为持久化才能保证消息的持久性。
4 绑定(Bindings)
绑定是交换机将消息路由到队列的规则
。为了让交换机能够正确的把消息投递到对应的队列,需要把交换机和队列通过路由键绑定起来,路由键就像是一个过滤器,决定了消息是否可以投递给消息队列。
如果一条消息不能被路由到任何队列(例如,因为它被发布到的交换机没有绑定),它要么被丢弃,要么返回给发布者,这取决于发布者设置的消息属性。
5 消费者
如果消息只是存储在队列里没有被消费,是没有什么实际作用的。在AMQP 0-9-1中,有两种途径可以进行消息的消费:
- 订阅消息队列,以将消息投递给应用(
push API
),这是推荐的做法; - 应用根据需要主动的轮训获取消息(
pull API
),这种方式非常低效,在大多数情况下应该避免。
如果应用程序对某一个特定队列的消息感兴趣,则可以注册一个消费者,对队列进行订阅。每个队列可以有多个消费者,当然也可以注册一个独享的消费者,这个时候其他消费者会被排除在外。
每个消费者(订阅)都有一个称为消费者标签的字符串类型的标识符,可以用它来退订消息。
5.1 消息确认
消费者应用程序可能偶尔无法处理单个消息或有时会崩溃,另外网络问题也有可能导致问题。这就提出了一个问题:**Broker何时应该从队列中删除消息?**AMQP 0-9-1 规范中约定让消费者对此进行控制,有两种确认模式:
- 自动确认模式:在Broker向应用程序发送消息之后(使用basic.deliver或basic.get-ok方法),将消息从消息队列中删除;
- 显示确认模式:在应用程序向broker发回确认之后(使用basic.ack方法),将消息从消息队列中删除。
在显示模式下,应用程序选择何时发送确认消息。如果消费者在没有发送确认的情况下就挂掉了,那么Broker会将其重新投递给另一个消费者,如果此时没有可用的消费者,那么Broker将等到至少有一个消费者注册到该队列时,再尝试重新投递消息。
另外,如果应用程序崩溃(当连接关闭时 AMQP Broker会感知到这一点),并且AMQP Broker在预期的时间内未收到消息确认,则消息将重新入队,如果此时有其他消费者,可能立即传递给另一个消费者。为此,我们的消费者做好业务的幂等处理也是非常重要的。
5.2 拒绝消息
当消费者接收到消息之后,可能处理成功或者失败。应用程序可以通过拒绝消息向Broker表明消息处理失败了(或者当时无法完成)。拒绝消息的时候,应用程序可以要求Broker丢弃消息或者重新入队。
当队列中只有一个消费者的时候,请确保您不会通过不断地拒绝消息和重新入队导致消息在同一个消费者身上无限循环的情况发生。
在AMQP中,basic.reject
方法用来执行拒绝消息的操作。
5.3 预取消息
在多个消费者共享一个队列的情况,能够制定每个消费者在发送下一个ack之前可以一次性接收多少条消息,这是非常有用的特性。这可以在试图批量发布消息的时候,起到简单的负载均衡和提高消息吞吐量的作用。
请注意:RabbitMQ仅支持通道级预取计数,不支持基于连接或者大小的预取。
6 消息属性和有效负载
AMQP 0-9-1模型中的消息是具有属性的,有些属性非常常见,以至于AMQP 0-9-1明确定义了它们,例如:
Content type
内容类型Content encoding
内容编码Routing key
路由键Delivery mode (persistent or not)
投递模式,是否持久化Message priority
消息优先级Message publishing timestamp
消息发布的时间戳Expiration period
消息有效期Publisher application id
发布消息的应用id
有些属性是被AMQP的Broker所使用的,但是大多数是开放给接收它们的应用程序用的。有些属性是可选的,称为消息头(headers),它们类似于HTTP协议的X-Headers,消息属性需要在消息被发布时定义。
消息体:AMQP消息除了属性之外,还包括一个有效载荷Payload(消息实际携带的数据),AMQP Broker视其为一个透明的字节数组来对待。Broker不会修改payload。消息可能只包含属性而没有payload。payload通常使用JSON、Thrift、Protocol Buffers和MessagePack等序列化格式来序列化成结构化的数据,以便进行发布,协议对等方通常使用Content type
和Content encoding
字段来传达此信息。
消息持久化:消息可以作为持久性发布,这使得Broker将他们持久化到磁盘。如果服务器重启之后,系统可以确保接收到的持久化消息不会丢失。简单的将消息发布到持久化的交换机或者被路由到持久化的队列中,是不会让消息也持久化的,消息是否持久化完全取决于消息本身的持久模式。将消息发布为持久性会影响性能,就像数据存储一样,持久性以一定的性能成本作为代价。
7 AMQP 0-9-1 方法
AMQP 0-9-1中定义了许多操作方法,详细参考:AMQP 0-9-1参考[5]。
很多方法都有对应的响应方法,有些甚至有不止一种可能的响应,如basic.get,响应可能为:get-ok或者get-empty。
如下是声明一个交换机和响应成功的方法:
8 连接(Connections)
AMQP 0-9-1 连接通常是长连接,AMQP 0-9-1 是一种使用TCP提供可靠投递的应用层协议。AMQP 0-9-1连接使用身份认证机制并提供TLS (SSL)保护。当应用程序不再需要连接到Broker时,它应该优雅地关闭其 AMQP 0-9-1 连接,而不是突然关闭底层 TCP 连接。
9 通道(Channels)
某些应用程序需要同时开启连接到Broker,但是,同时保持许多TCP连接是不可取的,这样会消耗系统资源并且使得配置防火墙更加困难。
AMQP 0-9-1通过通道复用技术通过通道的形式实现在一个TCP连接上面支持多个连接(虚拟的链接)。同一个TCP连接中有多个通道,通道之间的通信是完全隔离的。客户端的每个协议操作都携带了一个通道ID,代理和客户端都是用它来确定该操作所走的通道。
通道仅存在于TCP连接上下文中,一旦TCP连接关闭,其上所有通道也跟着关闭。
一般的,我们会给每个线程打开一个新的通道进行通信。
10 虚拟主机
为了让单个代理可以托管多个隔离的环境(用户组、交换机、队列等),AMQP中提供了虚拟主机,这类似于许多流行的Web服务器使用的虚拟主机。协议客户端在连接协商期间需要指定想要使用的虚拟主机。
11. AMQP Client架构
推荐的AQMP Client架构须由下面多个抽象层组成:
成帧层
:此层接收AMQP协议方法,并按某种语言格式(结构、类等)来序列化成线级帧,成帧层可以根据AMQP协议规范来实现;连接管理层
:此层用于读写AMQP帧,并管理所有连接、会话逻辑。在此层中,我们可以封装开启连接和会话、错误处理、内容传输和接收数据的全部逻辑;API层
:此层暴露了应用程序工作的特定API。API层可能会反映一些现有的标准,或暴露高层AMQP的方法。API层本身可能是由多个层组成的,如构建于AMQP方法API之上的高级API;IO层
:此外,通常还会有一些I/O层,这此可以是非常简单的同步套接字读取和写入或复杂的异步多线程IO。
AMQP就介绍到这里了,接下来Java架构杂谈
带大家详细看看RabbitMQ。
References
OASIS Advanced Message Queuing Protocol (AMQP) Version 1.0. Retrieved from http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf ↩︎
Which protocols does RabbitMQ support?. Retrieved from https://www.rabbitmq.com/protocols.html ↩︎
AMQP 0-9-1 Model Explained. Retrieved from https://www.rabbitmq.com/tutorials/amqp-concepts.html ↩︎
AMQP Working Group 0-9-1. Retrieved from https://www.amqp.org/specification/0-9-1/amqp-org-download ↩︎
AMQP 0-9-1 Complete Reference Guide. Retrieved from https://www.rabbitmq.com/amqp-0-9-1-reference.html ↩︎