scribble

ottocho's blog

Home About GitHub

09 Mar 2013
Rabbitmq Tutorial 2

rabbitmq 教程 2


前言

如第一部分中的前言所言,第一部分有Hello WorldWork QueuesPublish/Subscribe三个部分,第二部分有RouingTopic, RPC三个部分。本文为第二部分。


Routing

这个例子是上个例子(publish/subscribe)的强化。

上个例子(publish/subscribe章节)是一个没有过滤的广播。这次的例子是对某些message进行特别的处理,而不是简单的广播。

Bindings

binding是exchange和queue间的关系。

A binding is a relationship between an exchange and a queue. In another word, the queue is interested in messages from this exchange.

channel.queue_bind(exchange=exchange_name, queue=queue_name)

创建binding的时候还可以指定一个routing_key的参数。它的用途取决于exchange的类型。对于fanout类型而言,它是木有用的(广播也就不需要什么路由用的key了)

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

Direct exchange

上个例子中的日志系统仅仅做个中间通道,把所有消息都广播了。现在我们加强一下:对一些日志过一些过滤。例如,我们希望只接受脚本写的错误日志。fanout类型的exchange显然不符合我们的要求:它只能毫无意义的广播消息。

这次我们用的是__direct类型的exchange__。direct类型的exchange的路由算法非常简单:exchange会将message发送到与此message的binding key完全一致的binding所关系的queue。

例如下图:

direct-exchange

如上图,exchange X绑定了两个queue(Q1,Q2)。queue Q1绑定的key是’orange’,queue Q2有两个binding,因此也就对应两个routing key(’black’和’green’)。对此,routing key为’orange’的message会发向queue Q1,而routing key为’black’和’green’的message会发向queue Q2。而其他routing key不一致的消息将会被丢弃。

Multiple bindings

当然,一个exhange也可以在同一个routing key上绑定多个queue。看图,这个应该很好理解了。

direct-exchange-multiple

Emitting logs

在这例子中,我们用日志的级别(infowarningerror)作为routing key。因此,接收的脚本可以由日志级别来选择需要接受的消息。

先来看看怎么发送消息。

首先当然是创建一个exchange。(exchange名和type类型)

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

这个就是发消息了(serverity in [‘info’, ‘warning’, ‘error’])

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

Subscribing

接收消息,区别在于需要把exchange和routing在参数中指定好。

result = channel.queue_declare(exclusive=True)  # 还是无名随机的queue
queue_name = result.method.queue
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

Sample

The code for emit_log_direct.py:

#!/usr/bin/env python

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

The code for receive_logs_direct.py:

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]

if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

# 一个for循环给同一个exchange(名为'direct_logs'做了多个绑定,routing_key分别为severities[0:-1])
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

接收warning及error的日志:

$ python receive_logs_direct.py warning error > logs_from_rabbit.log

接受三种类型的日志

$ python receive_logs_direct.py info warning error

发送错误日志

$ python emit_log_direct.py error "Run. Run. Or it will explode."

Topic

在之前介绍了fanout类型的exchange用以广播,和direct类型的exchange用以将routing key完全匹配binding的消息发给指定queue。这显然还是功能不足:不能以多个条件限制之。

我们现在这样来加强我们之前的例子:用日志级别(serverity)和消息来源来路由我们的消息。 参考 syslog的例子,syslog用日志级别(info/warn/crit等)和日志来源(auth/cron/kern等)来路由日志的输出。

为了强化我们的日志消息系统,将它的日志路由方式更新上如上述的方式,就需要使用 topic 类型的exchange。

Topic类型的exchange很好理解,麻烦的是它的key的匹配。所以后面添了别人写的东西,原文翻译比较少。

Introduction

发送给topic类型的exchange的消息的routing_key不能是一个随意的词,而要是点连接的一些词。

这下面懒得翻译了。

Messages sent to a topic exchange can’t have an arbitrary routing_key - it must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message. A few valid routing key examples: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”. There can be as many words in the routing key as you like, up to the limit of 255 bytes. The binding key must also be in the same form. The logic behind the topic exchange is similar to a direct one - a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for binding keys: *(star) can substitute for exactly one word; #(hash) can substitute for zero or more words.

看图:

topics

bindings

In this example, we’re going to send messages which all describe animals. The messages will be sent with a routing key that consists of three words (two dots). The first word in the routing key will describe a celerity, second a colour and third a species: “..".

创建三个binding:

queue Q1 的 binding key *.orange.*

queue Q2 的 binding key 为 *.*.rabbitlazy.#

quick.orange.rabbitlazy.orange.elephantlazy.brown.fox这样的key应该去什么queue,显而易见了。 如果key是这样:orangequick.orange.male.rabbit,那么在这个例子里,他们不会去到任何binding中,消息就被丢弃了。

lazy.orange.male.rabbit会去到lazy.#中去。

Topic exchange

Topic的exchange可以利用两个符号,做到和fanout及direct类似功能的工作。

如果queue绑定了#的key,那么它就会不顾key的内容接受所有message:like fanout

如果queue绑定了*的key,那么它和direct就非常类似了。

Putting it all together

范例如下,emit_log_topic.py:

#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

receive_logs_topic.py

#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

接受所有日志:

python receive_logs_topic.py "#"

接受所有来自kern的日志:

python receive_logs_topic.py "kern.*"

接受所有critical的日志

python receive_logs_topic.py "*.critical"

当然是可以同时接收的(多个binding)

python receive_logs_topic.py "kern.*" "*.critical"

发送一个 “kern.critical” 的消息:

python emit_log_topic.py "kern.critical" "A critical kernel error"

上面的官方来的程序由自己指定key,所以可以自己做测试。

topic的key很莫名其妙。因为它不是正则(为什么不做正则呢!)它在理解上有点麻烦。

下面摘录了别人写的东西,原文在此here

topic对key的处理是这样的:

  1. . 点号 用来将routing key分割成若干部分(Part)(关键要理解这个__part__)
  2. * 星号 匹配一个完整的Part
  3. # 井号 匹配一个或者多个Part

范例如下:

Eshell V5.9  (abort with ^G)
1> rabbit_exchange_type_topic:topic_matches(<<"a.#">>,<<"a.b">>).
true
2> rabbit_exchange_type_topic:topic_matches(<<"a.#">>,<<"a.bc">>).
true
3> rabbit_exchange_type_topic:topic_matches(<<"a.#">>,<<"a.bc.bc">>).
true
4> rabbit_exchange_type_topic:topic_matches(<<"a.#">>,<<"a1.b">>).
false
5> rabbit_exchange_type_topic:topic_matches(<<"b.a.#">>,<<"a1.b">>).
false
6> rabbit_exchange_type_topic:topic_matches(<<"b.a.#">>,<<"a.b">>).
false
7> rabbit_exchange_type_topic:topic_matches(<<"a.*">>,<<"a.b">>).
true
8> rabbit_exchange_type_topic:topic_matches(<<"a.*">>,<<"a.bc">>).
true
9> rabbit_exchange_type_topic:topic_matches(<<"a.a*">>,<<"a.bc">>).
false
10> rabbit_exchange_type_topic:topic_matches(<<"a.a*">>,<<"a.ac">>).
false
11> rabbit_exchange_type_topic:topic_matches(<<"a.a#">>,<<"a.ac">>).
false
12> rabbit_exchange_type_topic:topic_matches(<<"a.*">>,<<"a.bc.a">>).
false
13> rabbit_exchange_type_topic:topic_matches(<<"a.*.*">>,<<"a.bc.a">>).
true
14> rabbit_exchange_type_topic:topic_matches(<<"a.b*">>,<<"a.bc">>).
false
15> rabbit_exchange_type_topic:topic_matches(<<"a.*.*">>,<<"a.b*">>).
false
16> rabbit_exchange_type_topic:topic_matches(<<"a.*">>,<<"a.b*">>).
true
17> rabbit_exchange_type_topic:topic_matches(<<"a.b*">>,<<"a.b*">>).
true
18> rabbit_exchange_type_topic:topic_matches(<<"*.a">>,<<"a.a">>).
true
19> rabbit_exchange_type_topic:topic_matches(<<"*.a">>,<<"a.a.b">>).
false
20> rabbit_exchange_type_topic:topic_matches(<<"*.a.b">>,<<"a.a">>).
false
21> rabbit_exchange_type_topic:topic_matches(<<"#.a">>,<<"a.a.b">>).
false
22> rabbit_exchange_type_topic:topic_matches(<<"#.a">>,<<"a.a">>).
true
23> rabbit_exchange_type_topic:topic_matches(<<"#.a">>,<<"a.a.a">>).
true
24>
24> rabbit_exchange_type_topic:topic_matches(<<"a.*.a">>,<<"a.a.a">>).
true
25> rabbit_exchange_type_topic:topic_matches(<<"a.*a.a">>,<<"a.aa.a">>).
false
26>
26> rabbit_exchange_type_topic:topic_matches(<<"*">>,<<"a.aa.a">>).
false
27> rabbit_exchange_type_topic:topic_matches(<<"*">>,<<"a">>).
true
28> rabbit_exchange_type_topic:topic_matches(<<"a.*.#">>,<<"a.b">>).
true
29> rabbit_exchange_type_topic:topic_matches(<<"a.*.#">>,<<"a.b.c">>).
true
30> rabbit_exchange_type_topic:topic_matches(<<"*.#">>,<<"a.b.c">>).
true
31> rabbit_exchange_type_topic:topic_matches(<<"*.#">>,<<"a.b.c">>).
true
32> rabbit_exchange_type_topic:topic_matches(<<"*">>,<<"">>).
false
33> rabbit_exchange_type_topic:topic_matches(<<"#.*">>,<<"..">>).
false
34> rabbit_exchange_type_topic:topic_matches(<<"a.*.#">>,<<"a.#">>).
true

RPC

这回我们来搞的是,Remote Procedure Call,也就是RPC。它干的事情是这样的:在远程一台机跑一个函数,然后取得它的运行结果。

我们利用RabbitMQ来搞一个简单的RPC服务,用它仅仅来返回Fibonacci数字。

Client interface

To illustrate how an RPC service could be used we’re going to create a simple client class. It’s going to expose a method named call which sends an RPC request and blocks until the answer is received:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)

about RPC

RPC的问题在于:调用方法的人可能不知道这个方法是一个本地方法还是一个很慢的RPC,这会导致系统出现莫名其妙的问题,而且非常难debug。因此,使用不当的RPC并不会简化程序,而是使代码变得更难维护。

因此对于使用RPC有几个建议:

  1. 必须确认和清楚一个函数调用是本地的还是远程的(local or remote)
  2. 给系统做好文档说明,清晰化模块间的关系。
  3. 做好错误处理:客户端在RPC的服务端崩溃时应该如何处理?

如果对RPC很多问题,应该使用“异步管道”。在这个管道中,处理结果可以异步的推进下一个计算状态。

Callback queue

RabbitMQ实现一个RPC系统是非常简单的:客户端发送一个请求信息,而服务端返回一个应答信息。

In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a ‘callback’ queue address with the request. Let’s try it:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                      ),
                      body=request)

Message properties

AMQP协议已经定义了14个message的属性。除了下面几个很常用,其他很多属性可能会很少用得上。常用的几个属性如下:

  • delivery_mode:标记这个message是持久性的(persistent,2),或者是暂时性的(transient)。(从work queue范例中可以看到它的应用)
  • content_type:编辑编码的mime-type(the mime-type of the encoding)(应该是和http header的Content-Type类似的玩意)。例如,json编码的数据,可以声明为:application/json
  • reply_to:通常用来命名一个callback queue。
  • correlation_id:通常用来管理RPC的请求和返回。

Correlation id

In the method presented above we suggest creating a callback queue for every RPC request. That’s pretty inefficient, but fortunately there is a better way - let’s create a single callback queue per client. That raises a new issue, having received a response in that queue it’s not clear to which request the response belongs. That’s when the correlation_id property is used. We’re going to set it to a unique value for every request. Later, when we receive a message in the callback queue we’ll look at this property, and based on that we’ll be able to match a response with a request. If we see an unknown correlation_id value, we may safely discard the message - it doesn’t belong to our requests.

You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It’s due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. That’s why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.

Sample

rpc

Our RPC will work like this:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.

Putting it all together

rpc_server.py

#!/usr/bin/env python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
                                            host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')

# 显然是一个测试用的最低效的fib函数
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)
    print " [.] fib(%s)"  % (n,)
    response = fib(n)
    # 空串指定的是默认的exchange
    # 如果名routing_key参数指定的queue存在的话,把message发送过去。
    # 此处server从请求中的reply_to获得了一个队列名
    # 利用默认的exchange,发送给这个临时的队列,从而把结果发回给请求者
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    # basic_ack用以简单确认下消息,以免对方(client)重发
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

# 逻辑很简单:
# server端在接受到message的时候,调用 on_request 函数。
# on_request 函数接收到message中的数字,对此进行处理(调用fib函数获得结果)
channel.basic_consume(on_request, queue='rpc_queue')
print " [x] Awaiting RPC requests"
channel.start_consuming()

rpc_client.py:

#!/usr/bin/env python
# coding:utf8

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        self.channel = self.connection.channel()
        result_queue = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result_queue.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    # 此处即为所谓的RPC方法
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        # 客户端在发送一个请求(其实也就是message)
        # 发送的时候,给 message 附上 reply_to 和 correlation_id 属性
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                  body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

def main():
    fibonacci_rpc = FibonacciRpcClient()
    print " [x] Requesting fib(30)"
    response = fibonacci_rpc.call(30)
    print " [.] Got %r" % (response,)

if __name__ == '__main__':
    main()

Our RPC service is now ready. We can start the server:

$ python rpc_server.py

To request a fibonacci number run the client:

$ python rpc_client.py

The presented design is not the only possible implementation of a RPC service, but it has some important advantages:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second rpc_server.py in a new console.
  • On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queue_declare are required. As a result the RPC client needs only one network round trip for a single RPC request.

Summary

对这个范例,以下是我的理解和详细描述。

server开启时,declare一个queue,即为rpc_queue。server为rpc_queue此queue的consumer,获取rpc_queue的message,而server在rpc_queue接收的message即为client发来的request,而server上对message的处理方法就是所谓的RPC方法,即server在rpc_queue上注册的对message的响应方法on_request。on_request方法处理在rpc_queue接收到message(即请求),会从message中取得两个属性,reply_tocorrelation_idcorrelation_id用以做确认用,而reply_to为结果发送时所用的routing_key。on_request计算出结果后,将结果弄成一个message,将此message加上客户端发来的correlation_id属性(此属性在client收到此结果message的时候,就可以让client把此结果和client的某次请求给对应起来),把这个结果的message发给client所声明等待结果的队列(即client发来的message的reply_to属性。server发送结果消息的时候,exchange参数为空,使用默认的exchange,而默认的exchange会试图把message发给routing_key同名的queue。server从client发来的message(即请求)中的reply_to获得了一个队列名,因此server在publish的时候,声明routing_keyreply_to值,就把结果的message发给client正在等候结果的队列了)

client程序中,实现了一个非常基本的类(FibonacciRpcClient),对一次方法调用进行了封装(也因此,在main函数中,不看源码是无法完全判断此方法是local还是remote,这是非常值得认真考虑的,这需要从命名规范和项目文档化上下好功夫。)。result_queue为client声明的匿名queue(temporary,exclusive且anonymous的queue保证获取自己需求的结果message),client程序为result_queue的consumer(获取其结果的message)。显然注册在result_queue上的方法即为处理client请求的结果的方法,在此处的实现即为简单的验证一下此结果(验证即为上面提到的correlation_id的校对)。client中的FibonacciRpcClient对象中,利用call方法来发起一次请求(对于RPC而言,发起请求的本质其实应该是发送一个message给处理者,利用收发隔离来实现解耦),请求发送的message中附上 reply_tocorrelation_id 属性。而client发出请求后,等待一些时间,server端应该就会在client请求中声明的匿名queue中发来结果。client结果对correlation_id的验证,即可确定结果,完成一次RPC。

用RabbitMQ来实现RPC,依然保持Client Server信息隐藏的特点,Client依赖的不是特定的Server而是特定的消息,在有多个等效Server的情况下,一个Server的状态是否正常不会影响到客户端的状态。

总结一下,使用RabbitMQ实现RPC,客观上还实现了下面的效果:

  1. 容错 一个Server崩溃不影响 Client
  2. 解耦了对特定通信协议和接口的依赖,统一走AMQP消息.
  3. 在多个RPC Server之间的负载均衡由RabbitMQ完成

备注

这两个文档看起来好像很长的样子。

【完】

ottocho

2013.03.09


Til next time,
at 11:37

scribble

Home About GitHub