scribble

ottocho's blog

Home About GitHub

09 Mar 2013
Rabbitmq Introduction

rabbitmq及amqp介绍


前言

这应该是一篇很旧的文章的译文,听说英文原文写得很好,但是原链有点问题了here

译文虽说是翻译得很风趣,但是我嫌写的不够直接。所以我在这里整理汇编了下,另外后续再把官方的tutorial翻译整理下,作为初接触rabbitmq的学习笔记。估计其他人看了会有点困。应该看原文。

原文于此,感谢之。here


概念

AMQP当中有四个概念非常重要

  1. virtual host,虚拟主机
  2. exchange,交换机
  3. queue,队列
  4. binding,绑定

一个虚拟主机持有一组交换机、队列和绑定。

为什么需要多个虚拟主机呢?因为RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机/

何谓虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)

队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。不过,也可以将一个队列配置成这样的:一旦消息进入这个队列,此消息就被删除。

队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。这没什么问题,如果一个消费者试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求。因此我们可以将消息队列的配置写在应用程序的代码里面。

而要把一个消息放进队列前,需要有一个交换机(Exchange)。

交换机(Exchange)可以理解成具有路由表的路由程序。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes)。(例如,指明具有路由键 “X” 的消息要到名为timbuku的队列当中去。)

消费者程序(Consumer)要负责创建你的交换机。交换机可以存在多个,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机来用5个核,另外3个核留下来做消息处理。类似的,在RabbitMQ的集群当中,你可以用类似的思路来扩展交换机一边获取更高的吞吐量。

交换机如何判断要把消息送到哪个队列?你需要路由规则,即绑定(binding)。一个绑定就是一个类似这样的规则:将交换机“desert(沙漠)”当中具有路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。例如,具有路由键“audit”的消息需要被送到两个队列,“log-forever”和“alert-the-big-dude”。要做到这个,就需要创建两个绑定,每个都连接一个交换机和一个队列,两者都是由“audit”路由键触发。在这种情况下,交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。

交换机有多种类型。他们都是做路由的,但是它们接受不同类型的绑定。为什么不创建一种交换机来处理所有类型的路由规则呢?因为每种规则用来做匹配分子的CPU开销是不同的。例如,一个“topic”类型的交换机试图将消息的路由键与类似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较(“direct”类型的交换机)要消耗更多的CPU。如果你不需要“topic”类型的交换机带来的灵活性,你可以通过使用“direct”类型的交换机获取更高的处理效率。那么有哪些类型,他们又是怎么处理的呢?

  1. Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
  2. Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 dog,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
  3. Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号*匹配不多不少一个词。因此audit.#能够匹配到audit.irs.corporate,但是audit.*只会匹配到audit.irs。下图用以表明topic交换机是如何工作的:

topic_exchange


持久化

你花了大量的时间来创建队列、交换机和绑定,然后,服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面但是尚未处理的消息们呢?

如果你是用默认参数构造的这一切的话,那么,他们都灰飞烟灭了。RabbitMQ重启之后会干净的像个新生儿。你必须重做所有的一切,亡羊补牢,如何避免将来再度发生此类杯具?

队列和交换机有一个创建时候指定的标志durable。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机,还有消息都是持久的呢?

但是首先需要考虑的问题是:是否真的需要消息的持久化?如果需要重启后消息可以回复,那么它需要被写入磁盘。但即使是最简单的磁盘操作也是要消耗时间的。所以需要衡量判断。

当你将消息发布到交换机的时候,可以指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不同,指定这个标志的方法可能不太一样。简单的说,就是将Delivery Mode设置成2,也就是持久的(persistent)即可。一般的AMQP库都是将Delivery Mode设置成1,也就是非持久的。所以要持久化消息的步骤如下:

  1. 将交换机设成 durable。
  2. 将队列设成 durable。
  3. 将消息的 Delivery Mode 设置成2 。

绑定(Bindings)怎么办?绑定无法在创建的时候设置成durable。没问题,如果你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是durable),依赖它的绑定都会自动删除。

注意:

  • RabbitMQ 不允许你绑定一个非坚固(non-durable)的交换机和一个durable的队列。反之亦然。要想成功必须队列和交换机都是durable的。
  • 一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。因此,最好仔细检查创建的标志。

Sample - AMQP与Python

有几个AMQP库可选:

  • py-amqplib - 通用的AMQP(久不更新了,这篇很旧的文章用来做范例)
  • txAMQP - 使用 Twisted 框架的AMQP库,因此允许异步I/O。
  • pika - 官方tutorial使用库

根据你的需求,py-amqplib或者txAMQP都是可以的。因为是基于Twisted的(这个玩意也是个很复杂的东西),txAMQP可以保证用异步IO构建超高性能的AMQP程序。但是Twisted编程本身就是一个很大的主题。因此清晰起见,我们打算用 py-amqplib。(这个库已经很久不更新了,推荐使用pika,本文做个暂时的例子)。

AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流。每个AMQP程序至少要有一个连接和一个channel。

from amqplib import client_0_8 as amqp

conn = amqp.Connection( host="localhost:5672", userid="guest", password="guest",
                        virtual_host="/", insist=False )
chan = conn.channel()

每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。或者可以使用.channel(x)来指定channel标识,其中x是你想要使用的channel标识。通常情况下,推荐使用.channel()方法来自动分配channel标识,以便防止冲突。

现在我们已经有了一个可以用的连接和channel。现在,我们的代码将分成两个应用,生产者(producer)和消费者(consumer)。

先创建一个消费者程序,他会创建一个叫做po_box的队列和一个叫sorting_room的交换机:

#!/usr/bin/env python

from amqplib import client_0_8 as amqp
conn = amqp.Connection( host="localhost:5672", userid="guest", password="guest",
                        virtual_host="/", insist=False )
chan = conn.channel()

chan.queue_declare(queue="po_queue", durable=True, exclusive=False, auto_delete=False)
chan.exchange_declare( exchange="po_exchange", type="direct",
                       durable=True, auto_delete=False)
chan.queue_bind(queue="po_queue", exchange="po_exchange", routing_key="otto")

# msg = chan.basic_get("po_queue")
# if msg:
#    print msg.body
#    chan.basic_ack(msg.delivery_tag)
# else:
#    print 'no msg from po_queue'

def recv_callback(msg):
    print 'Received: ' + msg.body
chan.basic_consume( queue='po_queue', no_ack=True,
                    callback=recv_callback, consumer_tag="testtag")
while True:
    chan.wait()
chan.basic_cancel("testtag")

这段代码干了啥?

首先,它创建了一个名叫po_box的队列queue_declare,它是durable的(重启之后会重新建立)(durable=True),并且最后一个消费者断开的时候不会自动删除auto_delete=False。在创建durable的队列(或者交换机)的时候,将auto_delete设置成false是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durableauto_delete都设置成True,只有尚有消费者活动的队列可以在RabbitMQ意外崩溃的时候自动恢复。而exclusive如果设置成True,只有创建这个队列的消费者程序才允许连接到该队列。这种队列对于这个消费者程序是私有的。

还有另一个交换机声明exchange_declare,创建了一个名字叫sorting_room的交换机。auto_delete和durable的含义和queue是一样的。但是,.excange_declare()还有另外一个参数叫做type,用来指定要创建的交换机的类型(如前面列出的): fanoutdirecttopic。 我们需要创建一个绑定,把它们连接起来。(queue_bindqueueexchangerouting_key连接起来)。这个绑定的过程非常直接。任何送到交换机sorting_room的具有路由键otto的消息都被路由到名为po_box的队列。

现在,有两种方法从队列当中取出消息。

第一个是调用chan.basic_get(),主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body会在没有消息的时候崩掉):

msg = chan.basic_get("po_box")  
print msg.body  
chan.basic_ack(msg.delivery_tag)     如果想要应用程序在消息到达的时候立即得到通知,需要用`chan.basic_consume()`注册一个新消息到达的回调。

def recv_callback(msg):
    print 'Received: ' + msg.body
chan.basic_consume(queue='po_queue', no_ack=True, callback=recv_callback, consumer_tag="testtag")
while True:
    chan.wait()
chan.basic_cancel("testtag")

chan.wait()放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。 chan.basic_cancel()用来注销该回调函数。参数consumer_tag当中指定的字符串和chan.basic_consume()注册的一致。

在这个例子当中chan.basic_cancel()不会被调用到,因为上面是个无限循环。不过你需要知道这个调用,所以我把它放在了代码里。

需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get()chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack()方法,使用消息的delivery_tag属性作为参数。参见chan.basic_get()的实例代码。

没有人发送消息的话,要消费者何用?

生产者如下。下面的代码示例表明如何将一个简单消息发送到交换区,并且标记为路由键otto

#!/usr/bin/env python

from amqplib import client_0_8 as amqp
import sys

conn = amqp.Connection( host="localhost:5672", userid="guest", password="guest",
                        virtual_host="/", insist=False )
chan = conn.channel()

msg = amqp.Message(sys.argv[1])
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="po_exchange", routing_key="otto")

chan.close()
conn.close()

你也许注意到我们设置消息的delivery_mode属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。


备注

官方和这个文档都没有讲到rabbitmq的安装配置问题。其实这个会出点小问题。具体的报错信息没有保存,但其实只要不用apt的包(Debian),用源码装最新的包,应该会没什么问题。另外可能需要给自己的测试用的rabbitmq设置下一下的配置:

# /etc/rabbitmq/rabbitmq-env.conf

NODENAME=rabbit@localhost  NODE_IP_ADDRESS=127.0.0.1

另外

用MarkDown写文章有好几个问题,一个是很多符号会被当作是语法元素,而是图片的确慢了些,而且排版有点问题。

【完】

ottocho

2013.03.09


Til next time,
at 09:59

scribble

Home About GitHub