現状の問題点としては、
- procmail駆動の為、受信ごとにPHPプロセスが立ち上がる
- 冗長性や高負荷を考慮したメール処理の多重化が困難
- PHPでバッチ処理って...
そこで、procmail + Python( Queue + xmlrpclib + XMLRPCServer )を検証実装していた。
とはいえ、単一サーバーのprocmail処理をプールする程度で、
本質的に冗長性や可用性を確保するには至らなかった。
そんな訳で、Postfix + Python + RabbitMQで実装することにした。
まずは、RabbitMQをインストールする
次に、Python用モジュール(pika)をインストールするecho 'deb http://www.rabbitmq.com/debian/ testing main' >> /etc/apt/source.lst wget -q http://www.rabbitmq.com/rabbitmq-signing-key-public.asc -O - | apt-key add - apt-get update apt-get install rabbitmq-server
pikaのサンプルソースを実行してみる。apt-get install python-setuptools easy_install pika
from pika.adapters import SelectConnection
# Create our connection object
connection = SelectConnection()
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
これを実行して、 「AttributeError: 'ProtocolHeader' object has no attribute 'channel_number'」 といったエラーが出力される場合は、RabbitMQが2.0未満の可能性が高い。
稼働ディストロに応じた本家リポジトリなどから再インストールすると解決できる。
次にRabbitMQに掲載されている単純なProducer/Consumerを試す。
Producer( 「deliver_mode=2」でDisk出力 )[ producer.py ]
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
Consumer( 「prefetch_count=1」でマルチキャスト。指定しないとブロードキャスト )[ consumer.py ]
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
おおまかには、この構成で実装する。但し実戦投入するには、
- クラスタリング構成時に自動的な縮退運転への切り替え
- スプリットブレインなどの障害時のキュー復元
この辺りを次回検証する。
0 件のコメント:
コメントを投稿