AMQP 0-9-1特性 – RabbitMQ教程(二)

RabbitMQ是基于AMQP 0-9-1协议的实现,AMQP 0-9-1(高级消息队列协议)是一种消息传递协议,使一致的客户端应用程序可以与一致的消息传递中间件代理进行通信。

img

下面我们会围绕这个图来进行讲解。

简述处理流程

假设我们有一条 "Hello,World"消息要使用RabbitMQ通过服务器发送给客户端,会是怎样一个流程呢?

Publish path from publisher to consumer via exchange and queue

首先,服务端调用RabbitMQ的SDK,作为Publisher将 "Hello,World"这条消息发送到Broker的交换机(Exchange)上,然后根据交换机对应的路由规则将消息发送到不同的消息队列(Queue),客户端(Customer)根据需要使用SDK从Broker中对应的Queue中主动拉取或由Broker推送获取到这条"Hello,World"。然而,在这一过程中,"Hello,World"这一消息对Broker来说是不透明的,Broker接收二进制数据,它不知道怎么解码该消息,只有Publisher和Customer才明白这条消息的内容和结构。

交换机(Exchange)

支持的类型

根据交换机的类型和交换机的规则(这里的类型和规则我们称为Bindings),其路由算法会有所不同。下面将列出四种AMQP 0-9-1 Broker支持的交换机类型

交换机类型 预声名名称
Direct Exchange 直连交换机 空字符或amq.direct
Fanout Exchange 扇出交换机 amq.fanout
Topic Exchange 主题交换机 amq.topic
Headers Exchange 头部信息交换机 amq.match或amq.headers(RabbitMQ支持)

除了交换机类型,还有一部分重要的元信息需要声名。

元信息 作用
name 交换机名称
durability Broker重启后是否恢复
auto-delete 自动删除,如果该交换机最后一个队列被解除绑定时,该交换机是否会被删除
arguments 可选参数,根据插件和Broker的特性来决定其内容

交换机可以持久化也可以是非持久化的,持久化的交换机在Broker(例如RabbitMQ)服务重启后仍存在,而非持久化交换机在Broker重启后会丢失,再次使用需要重新声名。并不是所有场景都需要持久化交换机,具体使用哪种需要根据应用场景来具体分析。

默认交换机

默认交换机是Direct Exchange即直连交换机,对于简单场景情况下使用直连交换机是非常方便的,这里的路由键是其消息队列名称,消息将会直接被发送到对应名称的消息队列。

举个例子,例如你创建了一个叫做"kline"的队列,你的单个策略程序作为Customer监听该队列,行情获取服务作为Publisher从交易所获取K线数据并通过"kline"队列进行二次分发,RabbitMQ将会将"kline"作为路由键(或称绑定键)将其绑定到默认交换机上,这时一条K线信息被行情获取服务publish到默认交换机上,默认交换机则直接转发到"kline"队列后推送到你的策略程序。

这里看上去交换机没有任何特殊作用,似乎消息是通过

Publisher —-> "kline" Queue —-> Customer

这种传递途径直接传递的,然而事实上它仍然经过了默认交换机(这里默认交换机被指定为直连交换机)

Publisher —-> Direct Exchange —-> "kline" Queue —-> Customer

只不过直连交换机的作用仅仅是将消息转发到路由键同名的队列中。

直连交换机

exchange delivering messages to  queues based on routing key

上面我在单个上加了黑体,为什么呢?因为在多个Customer 共用同一个直连交换机下的同一消息队列时,消息只会发送给其中一个Customer,这就是直连交换机的本质 — 1 => 1

直连交换机通常用于通过循环方式在多个工作线程(同一应用的多个实例)之间分配任务,例如爬虫开启多个线程下载不同的网页,我们让多个线程共同监听同一个消息队列,然后解析器将解析出来的下载链接发送到该队列中,RabbitMQ会根据顺序将链接派发给其中一个线程来进行下载。

扇出交换机

扇出交换机是一种一对多的模式,或者我们叫1 => N。当有n个队列绑定到同一个扇出交换机,且有一条消息被publish到该交换机时,交换机会将该消息的副本传递到n个队列中,而不像直连交换机只选择其中一个进行发送。扇出交换机使用的场景是广播事件。

例如我们有多个策略共用同上证指数信息,我们可以让每一个策略创建一个队列并绑定到扇出交换机上,行情获取服务从交易所获取到上证指数后publish到扇出交换机,扇出交换机将上证指数广播到每一个策略的队列中。

扇出交换机可以用下面的图形表示:

exchange delivering messages to three queues

主题交换机

主题交换机根据路由键及绑定的模式将信息路由到一个或多个队列,主题交换机常用于实现各种不同的发布订阅模式需求,使用场景是多播事件。

例如我的行情获取服务同时获取了多只股票的行情信息,而我的某个策略仅需要其中两个股票的行情信息,我可以通过设置消息键为股票名称来进行筛选,仅将我需要的股票发送到策略的消息队列中。

头部信息交换机

由于性能问题,头部信息交换机不常用,此处不赘述。

队列

AMQP 0-9-1模型中的队列和其他系统的队列高度相似,它们都用来储存信息。队列和交换机有一些共性,也有一些额外的属性:

元信息 作用
name 交换机名称
durable Broker重启后是否仍存在
exclusive 独占模式,仅由一个连接使用,当该连接关闭时,该队列将会被删除
auto-delete 自动删除,如果队列最后一个消费者连接取消订阅时,该队列是否会被删除
arguments 可选参数,根据插件和Broker的特性来决定其内容,如消息 TTL、队列长度限制等

消费者

消息队列中获取消息的方式有两种:

  • 订阅分发(Push API)推荐
  • 轮询拉去(Pull API)不推荐

每个消费者都有对应的标识符,它可以用来退订消息,即取消队列的绑定。

消息确认

由于网络的不可靠性,Customer可能无法处理消息,在AMQP 0-9-1模型中有一个消息确认机制,当消息被Customer接收时,Customer会手动或自动通知Broker确认已接收到消息,而后Broker才会将消息从消息队列中完全删除。

消息确认包括隐式确认(自动确认)和显示确认(手动确认),其删除方式包括两种:

  • 消费者发送basic.deliverbasic.get-ok之后进行删除
  • 消费者发送basic.ack之后进行删除

前者即隐式确认,后者为显示确认。如果一个消费者没有发送确认消息就断开连接了,那么Broker将会把消息重新发送给另一个消费者或者当没有消费者时等待新消费者上线后再重新发送。

拒收消息

消费者接收到消息后会对消息进行处理,对该消息的操作可能成功也可能失败,消费者可以通过拒收消息basic.reject表示该消息处理失败,同时告知Broker丢弃该消息或重新请求该条消息。注意不要通过持续拒收和重新获取同一条消息来陷入死循环。

负确认

RabbitMQ提供了一个扩展指令称为负面致谢,弥补上面的拒收消息(basic.reject)只能拒收单条消息的情况。

交换机操作

创建新交换机

exchange.declare 声明一个交换机

exchange.declare-ok声名成功

exchange.declare

删除已有交换机

exchange.delete 删除交换机

exchange.delete-ok 删除成功

exchange.declare-ok

队列操作

创建新队列

queue.declare 声明一个队列

queue.declare-ok `声名成功。

queue.declare

queue.declare-ok

 

信道

如果项目需要发布消息,那么必须要链接到 RabbitMQ,而项目于 RabbitMQ之间使用 TCP 连接,加入每次发布消息都要连接TCP,这不仅会造成连接资源严重浪费,会造成服务器性能瓶颈,所以 RabbitMQ 为所有的线程只用一条 TCP 连接,怎么实现的呢?RabbitMQ 引入了信道的概念,所有需要发布消息的线程都包装成一条信道在 TCP 中传输,理论上 一条 TCP 连接支持无限多个信道,模型如下:

mq

本文系作者 @ 原创发布在 CycleGen。未经许可,禁止转载。

喜欢()
评论 (0)
    热门搜索
    173 文章
    1 评论
    47 喜欢
    Top