現状の問題点としては、
- procmail駆動の為、受信ごとにPHPプロセスが立ち上がる
- 冗長性や高負荷を考慮したメール処理の多重化が困難
- PHPでバッチ処理って...
そこで、procmail + Python( Queue + xmlrpclib + XMLRPCServer )を検証実装していた。
とはいえ、単一サーバーのprocmail処理をプールする程度で、
本質的に冗長性や可用性を確保するには至らなかった。
そんな訳で、Postfix + Python + RabbitMQで実装することにした。
まずは、RabbitMQをインストールする
次に、Python用モジュール(pika)をインストールするecho 'deb http://www.rabbitmq.com/debian/ testing main' >> /etc/apt/source.lst wget -q http://www.rabbitmq.com/rabbitmq-signing-key-public.asc -O - | apt-key add - apt-get update apt-get install rabbitmq-server
pikaのサンプルソースを実行してみる。apt-get install python-setuptools easy_install pika
from pika.adapters import SelectConnection # Create our connection object connection = SelectConnection() try: # Loop so we can communicate with RabbitMQ connection.ioloop.start() except KeyboardInterrupt: # Gracefully close the connection connection.close() # Loop until we're fully closed, will stop on its own connection.ioloop.start()
これを実行して、 「AttributeError: 'ProtocolHeader' object has no attribute 'channel_number'」 といったエラーが出力される場合は、RabbitMQが2.0未満の可能性が高い。
稼働ディストロに応じた本家リポジトリなどから再インストールすると解決できる。
次にRabbitMQに掲載されている単純なProducer/Consumerを試す。
Producer( 「deliver_mode=2」でDisk出力 )[ producer.py ]
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
Consumer( 「prefetch_count=1」でマルチキャスト。指定しないとブロードキャスト )[ consumer.py ]
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()おおまかには、この構成で実装する。
但し実戦投入するには、
- クラスタリング構成時に自動的な縮退運転への切り替え
- スプリットブレインなどの障害時のキュー復元
この辺りを次回検証する。
0 件のコメント:
コメントを投稿