セロリ、ニンジン、ウサギが好きです(嘘です)。
loc2go.comはTCX→KMLのデータコンバートをオンライン=同期で行っているのですが、サーバは重いしクライアントを待たせすぎ(使ってる方、重くてスイマセン)なのでジョブキュー設計を導入したいと考えてました。
で、いろいろ調べてみるとceleryとRabbitMQが面白そうなのでちょっと試してみた。
ざっくりとしたシステム概要は、celeryがpythonで書かれたキューマネージャ、バックエンドにはerlangで書かれたRabbitMQ+適当なRDBMSが置かれる。タスク依頼はAMQPを使ってceleryに依頼し、RabbitMQがキューをワーカへ投げる(という理解で合ってるんだろうか)。
vmwareにDebian Lennyの仮想マシンを作ってちょこちょこと。
aptで以下を入れる。
- python2.5
- python-setuptools
- python-django
- erlang-nox
コマンドだとこんなかんじ。
# aptitude install python2.5 python-setuptools python-django erlang-nox
RabbitMQはdebがあったので/etc/apt/source.listに以下を追加。
deb http://www.rabbitmq.com/debian/ testing main
aptでガツンと入れる。
# aptitude install rabbitmq-server
チュートリアル通りにRabbitMQの認証設定をする。
$ rabbitmqctl add_user myuser mypassword
$ rabbitmqctl add_vhost myvhost
$ rabbitmqctl set_permissions -p myvhost myuser “” “.” “.”
celeryとmultiprocessing入れる(pip入ってなかったらここから入れてください)。
# pip install celery
pip install multiprocessing
djangoのプロジェクトを作る。
$ mkdir ~/project/
$ cd ~/project/
$ django-admin.py startproject celerytest
$ cd celerytest
INSTALLED_APPSにceleryを追加。データベースは適当に設定します(私はsqlite3を使いました)。
[code lang=“python”] INSTALLED_APPS = ( ‘django.contrib.auth’, ‘django.contrib.contenttypes’, ‘django.contrib.sessions’, ‘django.contrib.sites’, ‘celery’, # これを追加。 ‘mytask’, # これも追加(後述) } [/code]
AMQPサーバの設定も加えます。
[code lang=“python”] AMQP_SERVER=‘127.0.0.1’ AMQP_PORT=5672 AMQP_USER=‘myuser’ AMQP_PASSWORD=‘mypassword’ AMQP_VHOST=‘myvhost’ [/code]
で、データベースを作るとジョブ管理用のスキーマができあがります。
$ python manage.py syncdb
非同期タスクはcelery.task.Taskを継承して作る。タスクはdjangoアプリケーションで作成する必要があるようなので、mytaskというアプリケーションを作る。
$ python manage.py startapp mytask
mytask/tasks.pyに以下を書く。内容は「5秒待ってhogehogeと返す」だけ。
[code lang=“python”] from celery.task import Task from celery.registry import tasks import time
class MyTask(Task): name=‘MyTask’
def run(self, **kwargs): time.sleep(5) return ‘hogehoge’
tasks.register(MyTask) [/code]
celerydはmanage.pyから起動する。タスクはceleryd起動時に読み込まれる。
$ python manage.py celeryd
アプリケーションがあればipythonからタスクを呼び出せる。tasksをインポート。
[code lang=“python”] from mytask import tasks [/code]
delayメソッドを呼び出すとタスクが走る(他にもあるけどよく知らん)。
[code lang=“python”] res=tasks.MyTask.delay() [/code]
resにAsyncResultオブジェクトが入る。
[code lang=“python”] print res <AsyncResult: c50cc259-ab02-46be-8ec6-e5c63ce4e1e2> [/code]
タスクが完了しているかどうかはsuccessful()メソッドを使う。完了していなければFalseが返る。 [code lang=“python”] print res.successful() False [/code]
5秒後に再度実行するとTrueが返る、はず。 [code lang=“python”] print res.successful() True [/code]
値はget()メソッドを使う。 [code lang=“python”] print res.get() ‘hogehoge’ [/code]
ちょっと駆け足でしたがPython Hack-a-thon #2で熱いネタだったのでとりあえず公開。突っ込み歓迎。
もしmultiprocessing周りでエラーが出たら、以下のURLのパッチを適用する。 http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/
– is_done()なくなるよ!と指摘をいただいたので修正した。(2009/11/15 1:09) [ad#text_wide]