scribble

ottocho's blog

Home About GitHub

09 Mar 2013
Rabbitmq Tutorial 1

rabbitmq 教程 1


前言

这是我在学习官方文档时候,做的一些笔记。加上官方的文档,汇总成一个入门级的rabbitmq指引。应该算不上是翻译,很多语句我觉得不翻译比翻译更好,原创那就更说不上了。代码都调试过写上了注释。

原tutorial用了六个文章做案例,从最简单的one to one 模型,慢慢过度到一个有点样子的RPC范例。我这里就放在两个文章里,分成六个小节。

原文档于此。getstarted.html

第一部分有Hello WorldWork QueuesPublish/Subscribe三个部分,第二部分有RouingTopic, RPC三个部分。本文为第一部分。


RabbitMQ

rabbitmq_logo

从概念上讲,RabbitMQ解决的是应用程序之间互联(connect)和规模(scale)的问题,消息发送和接收是隔离,发送方不知道消息最终由谁接收,接收方也不必关心消息是谁步发出的;发送和接收是隔离的,消息本质上就是异步的。这种隔离也就解耦了应用程序之间的依赖。RabbitMQ的角色就是应用程序中间的路由器。 而对于规模(scale)而言,应用程序解除了相互依赖之后从业务层面更容易做扩展。from here


Hello World

from here

RabbitMQ is a message broker.

Glossary

还是声明下RabbitMQ中的术语吧。

Producing: sending message; Producer: the program that sends messages

producer

Queue: the name for a mailbox, lives inside RabbitMQ. It’s essentially an infinite buffer.Many producers can send messages that go to one queue, many consumers can try to receive data from one queue.

queue

Consuming, Consumer: like producing and producer, it is a program that mostly waits to receive messages.

consumer

The simplest model of the produce-queue-consume. Talk is cheap, show you the code.

实例

Sending.py,此脚本用以解释最简单的 one to one 的消息投递

#!/usr/bin/env python
# coding:utf8

import pika
import time

credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# create a queue
# queue_declare是幂等的操作,如果queue存在,这就是个声明,否则就会创建之。
channel.queue_declare(queue='hello')

## exchange 用以判断message应该去哪个queue
# binding 就是在exchange上的规则,用routing key来判断message应该去哪个queue
# exchange 参数指定为一个空字符串,则使用默认的路由

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

Receiving.py

#!/usr/bin/env python
# coding:utf8

import pika

# 建立连接和上面类似
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

#
channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

# no_ack的详细用途在上一文intro中有讲到
# 如果no_ack=True,则会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。
# 设置用途:RabbitMQ如果一段时间内不回馈,会将该消息重新分配给另外一个绑定在该队列上的消费者。
# 另外:可能你需要自己做反馈的处理
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()

Work Queues

from here

Work Queue

be used to distribute time-consuming tasks among multiple workers. 这应该是一个Producer将信息给队列,而有多个Consumer从队列中获取信息。将任务视为消息,此queue则可以视为一个任务队列

work-queue

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to the queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them. This concept is especially useful in web applications where it’s impossible to handle a complex task during a short HTTP request window.

实例

在此节的例子中,消息还是简单的字符串(依旧不是图像或pdf等)。因此利用sleep来模拟计算时间,并用点号来模拟需要的时间。如“hello…”损耗三秒时间计算。

下例中,new_task.py 程序,它schedule tasks to the work queue(类似于上一例子中的sender)。

注意,在没有指定exchange的情况下,message就发送到名为routing_keyqueue中。

channel.queue_declare(queue='hello')
message = ' '.join(sys.argv[1:]) or "hello world."

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent '%s'" % (message,)

worker.py: 而类同上例的receive.py,把回调方法加上模拟的处理时间

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep(body.count('.'))
    print " [x] Done"

Task Queue and Round-robin dispatching

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

默认下,消费者会轮流获得message,因此会有一个消费者的循环。

对于简单的task queue而言,拓展是很容易的(但是默认的配置没有考虑负载均衡)

可以用new_task.pyworker.py来模拟这个round-robin

Message acknowledgment (no_ack)

如果一个task用了很长时间来处理一个message,而且处理中途突然挂了,那么这个消息是会消失的,因为RabbitMQ一旦把message给了customer,就将此message从内存中移去。如果kill了一个worker(consumer),那么此consumer所持有的但是还没处理的message也会消失。

当然这是很有问题的。我们希望一个consumer挂了后,可以把它的消息转发给其他consumer。 Message acknowledgement 用以确认消息是否丢失。consumer会发回一个ack给RabbitMQ,告诉它这个message搞定了,RabbitMQ就可以决定是否清理了它。如果consumer没有送回某message的ack,那么RabbitMQ就将此message发给其他consumer。因此对message就可以保证不丢失了。

There aren’t any message timeouts.(居然不可以设置超时??)RabbitMQ will redeliver the message only when the worker connection dies. It’s fine even if processing a message takes a very, very long time.

Message acknowledgments 默认打开(默认no_ack=False)

Forgotten acknowledgment

It’s a common mistake to miss the basic_ack. It’s an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won’t be able to release any unacked messages.

如果太多message没有ack,RabbitMQ会消耗过多的内存。要确定有没有这样的问题,可以这样:

$ rabbitmqctl list_queues name messages_ready messages_unacknowledged

Message durability

上面提到的是如果consumer挂了,利用ack来重发消息。但是如果RabbitMQ服务器挂了,消息会有什么后果?如果需要持久化,需要 set the queue and messages as durable.

channel.queue_declare(queue='hello', durable=True)

不允许更新已存在的队列的参数。在queue_declare时,如果声明已存在的队列不同的参数,会返回错误。

如果要更新已存在的队列的参数,要这样干:声明一个新的别名队列

例如:(原队列名为hello)

channel.queue_declare(queue='task_queue', durable=True)

然后在producer和consumer的代码中都要更新。

对于producer而言,要将信息声明为persistent:设置delivery_mode参数为2.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

注意:

标识消息为持久的(persistent)并不意味着message完全保证不会丢失。Although it tells RabbitMQ to save message to the disk, there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet. Also, RabbitMQ doesn’t do fsync(2) for every message, it may be just saved to cache and not really written to the disk. 即,持久化的保证是不足够强的,但会比简单的task queue强。如果需要非常强健的保证,可以将一次message的发送封装在一个transaction中。

Fair dispatch 公平调度

You might have noticed that the dispatching still doesn’t work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn’t know anything about that and will still dispatch messages evenly. (总是平均分配消息)

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn’t look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

prefetch-count

In order to defeat that we can use the basic.qos method with the prefetch_count=1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

channel.basic_qos(prefetch_count=1)

队列大小

如果事务繁忙,queue会塞满。(文档里也没说什么有用的东西)

Sample

源码如下: new_task.py

#!/usr/bin/env python
# coding:utf8

import sys
import pika
import time

credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "hello world."

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent '%s'" % (message,)
connection.close()

worker.py

#!/usr/bin/env python
# coding:utf8

import pika
import time

credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep(body.count('.'))
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
channel.start_consuming()

Publish/Subscribe

区别

Work Queue和Publish/Subscribe都是one to many模型。

他们的区别在于:Work Queue的一条message只会给到一个consumer(worker),而publish/subscribe模型中,同一条消息会发给多个consumer。 从程序服务而言,他们都是one to many,但从message而言,Work Queueone message to one consumerpublish/subscribe则是one message to many consumer.

以下范例程序用两个程序来做例子。

一个程序发送日志消息,而另外一个接受消息输出。在此范例中,所有receiver都会接收到消息,因此可以开一个receiver来写日志,另外开receiver来查看日志。也就是说:发送日志的时候,是广播到所有queue的。(fanout的exchange)

Exchanges

在前面的几个两个文档中都是很简单的模型。现在介绍下exchange。 之前的模型中提到的角色有:

  • producer,发送message
  • queue,存message的buffer
  • consumer,接受message

对于rabbitmq而言,producer并非把message直接发给一个queue,Actually, quite often the producer doesn’t even know if a message will be delivered to any queue at all. 实际上,producer将message发送给exchange。它从producer接受message,按照一定的规则,将message发送给queue。而根据exchange type决定message是发给一个指定的queue,还是发给多个queue,或者直接丢弃。

如图,P as Producer, X as eXchange

exchanges

exchange type有:direct, topic, headers and fanout

以下介绍fanout类型的exchange,顾名思义,fanout类型的exchange将message广播给它所知道的queue。

channel.exchange_declare(exchange='logs', type='fanout')

查看exchanges命令如下:

$ rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

amq.*名称的exchange就是默认的未命名的exchange。

Nameless exchange

在之前的例子中没有声明和指定exchange,但是依旧可以做消息传递,是因为使用了默认的exchange。默认的exchange用一个空字符串指定(”“)。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

exchange参数指定exchange。空串指定的是默认的exchange:如果名routing_key参数指定的queue存在的话,把message发送过去。

临时队列 Temporary queues

现在的栗子场景是介样的:

做一个logger,监听所有message(也就是log),并且只关注当前的日志(不在乎旧的)。 那么此queue会这样来声明:result = channel.queue_declare(exclusive=True) 不指定queue参数的情况下,rabbitmq会生成一个随机的名字(如amq.gen-JzTY20BRgKO-HjmUJj0wLg)。指定了exclusive=True参数情况下,在没有consumer连接的情况下,删除queue。

而使用 result.method.queue 变量就可以获得这个queue的名字。这个queue是个无名的临时队列。

这样的队列相当于一个中转中心,并不做存储,而仅仅做一个消息广播队列。在有consumer连接的时候,就会获取消息。

Bindings 绑定

什么是binding:exchange和queue的关系(消息传递方法,即路由)。That relationship between exchange and a queue is called a binding.

bindings

设置exchange logs使用上面声明的匿名的queue。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

查看bingding:

rabbitmqctl list_bindings.

范例代码

exchange-sample

这次的producer范例程序和之前的都很类似,区别在于,它指定了exchange,而不是之前那样使用匿名的默认exchange。通常而言,使用exchange需要给出 routing_key,但是fanout类型的exchange会忽略之(广播给所有queue,也就不需要什么key来route它了)。

#!/usr/bin/env python
# emit_log.py

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
             host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()

接收者如下。

The messages will be lost if no queue is bound to the exchange yet, but that’s okay for us; if no consumer is listening yet we can safely discard the message.

#!/usr/bin/env python
# receive_logs.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
                                     host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
                   queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

如下,可以开两个consumer,它会生成两个匿名的queue。这两个queue都bind到exchange "logs"上去。emit_log发送消息给exchange "logs",而"logs"会将消息广播给这两个匿名的queue。此时的rabbit相当于一个中转广播。一个receiver对应一个匿名的queue,一单停止了其中一个receiver,其对应的匿名queue就会销毁。如果没有开启receiver,也就没有生成匿名queue,exchange也就没有bind到任何queue,那么消息发来时,相当于直接丢弃,因为没有队列。 如果需要广播,数据又不能像这里这样不保证消息,那么就应该声明queue名称使用。

两个终端开两个consumer

$ python receive_logs.py > logs_from_rabbit.log
$ python receive_logs.py

开个producer

$ python emit_log.py 'message'

查看之

$ rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names.


备注

这文档看起来好像很长的样子。

【完】

ottocho

2013.03.09


Til next time,
at 10:46

scribble

Home About GitHub