2012/01/18

DebianでPython + RabbitMQ( 1 )

Postfix + procmail + PHPで実装していたサービスを拡張することになった。
現状の問題点としては、
  • procmail駆動の為、受信ごとにPHPプロセスが立ち上がる
  • 冗長性や高負荷を考慮したメール処理の多重化が困難
  • PHPでバッチ処理って...
など色々とある。
そこで、procmail + Python( Queue + xmlrpclibXMLRPCServer )を検証実装していた。
とはいえ、単一サーバーのprocmail処理をプールする程度で、
本質的に冗長性や可用性を確保するには至らなかった。
そんな訳で、Postfix + Python + RabbitMQで実装することにした。

まずは、RabbitMQをインストールする
echo 'deb http://www.rabbitmq.com/debian/ testing main' >> /etc/apt/source.lst
wget -q http://www.rabbitmq.com/rabbitmq-signing-key-public.asc -O - | apt-key add -
apt-get update
apt-get install rabbitmq-server
次に、Python用モジュール(pika)をインストールする
apt-get install python-setuptools
easy_install pika
pikaのサンプルソースを実行してみる。
from pika.adapters import SelectConnection

# Create our connection object
connection = SelectConnection()

try:
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

これを実行して、 「AttributeError: 'ProtocolHeader' object has no attribute 'channel_number'」 といったエラーが出力される場合は、RabbitMQが2.0未満の可能性が高い。
稼働ディストロに応じた本家リポジトリなどから再インストールすると解決できる。

次にRabbitMQに掲載されている単純なProducer/Consumerを試す。
Producer( 「deliver_mode=2」でDisk出力 )[ producer.py ]
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()

Consumer( 「prefetch_count=1」でマルチキャスト。指定しないとブロードキャスト )[ consumer.py ]
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
おおまかには、この構成で実装する。

但し実戦投入するには、
  • クラスタリング構成時に自動的な縮退運転への切り替え
  • スプリットブレインなどの障害時のキュー復元
などの課題がある。
この辺りを次回検証する。

0 件のコメント: