Go for it!

モーターサイクルと自転車とキャンプの日々。

Debian Lennyでcelery/rabbitmqを動かすメモ

セロリ、ニンジン、ウサギが好きです(嘘です)。

loc2go.comはTCX→KMLのデータコンバートをオンライン=同期で行っているのですが、サーバは重いしクライアントを待たせすぎ(使ってる方、重くてスイマセン)なのでジョブキュー設計を導入したいと考えてました。

で、いろいろ調べてみるとceleryとRabbitMQが面白そうなのでちょっと試してみた。

ざっくりとしたシステム概要は、celeryがpythonで書かれたキューマネージャ、バックエンドにはerlangで書かれたRabbitMQ+適当なRDBMSが置かれる。タスク依頼はAMQPを使ってceleryに依頼し、RabbitMQがキューをワーカへ投げる(という理解で合ってるんだろうか)。

vmwareにDebian Lennyの仮想マシンを作ってちょこちょこと。

aptで以下を入れる。

  • python2.5
  • python-setuptools
  • python-django
  • erlang-nox

コマンドだとこんなかんじ。

# aptitude install python2.5 python-setuptools python-django erlang-nox

RabbitMQはdebがあったので/etc/apt/source.listに以下を追加。

deb http://www.rabbitmq.com/debian/ testing main

aptでガツンと入れる。

# aptitude install rabbitmq-server

チュートリアル通りにRabbitMQの認証設定をする。

$ rabbitmqctl add_user myuser mypassword
$ rabbitmqctl add_vhost myvhost
$ rabbitmqctl set_permissions -p myvhost myuser “” “.” “.

celeryとmultiprocessing入れる(pip入ってなかったらここから入れてください)。

# pip install celery

pip install multiprocessing

djangoのプロジェクトを作る。

$ mkdir ~/project/
$ cd ~/project/
$ django-admin.py startproject celerytest
$ cd celerytest

INSTALLED_APPSにceleryを追加。データベースは適当に設定します(私はsqlite3を使いました)。

[code lang=“python”] INSTALLED_APPS = ( ‘django.contrib.auth’, ‘django.contrib.contenttypes’, ‘django.contrib.sessions’, ‘django.contrib.sites’, ‘celery’, # これを追加。 ‘mytask’, # これも追加(後述) } [/code]

AMQPサーバの設定も加えます。

[code lang=“python”] AMQP_SERVER=‘127.0.0.1’ AMQP_PORT=5672 AMQP_USER=‘myuser’ AMQP_PASSWORD=‘mypassword’ AMQP_VHOST=‘myvhost’ [/code]

で、データベースを作るとジョブ管理用のスキーマができあがります。

$ python manage.py syncdb

非同期タスクはcelery.task.Taskを継承して作る。タスクはdjangoアプリケーションで作成する必要があるようなので、mytaskというアプリケーションを作る。

$ python manage.py startapp mytask

mytask/tasks.pyに以下を書く。内容は「5秒待ってhogehogeと返す」だけ。

[code lang=“python”] from celery.task import Task from celery.registry import tasks import time

class MyTask(Task): name=‘MyTask’

def run(self, **kwargs): time.sleep(5) return ‘hogehoge’

tasks.register(MyTask) [/code]

celerydはmanage.pyから起動する。タスクはceleryd起動時に読み込まれる。

$ python manage.py celeryd

アプリケーションがあればipythonからタスクを呼び出せる。tasksをインポート。

[code lang=“python”] from mytask import tasks [/code]

delayメソッドを呼び出すとタスクが走る(他にもあるけどよく知らん)。

[code lang=“python”] res=tasks.MyTask.delay() [/code]

resにAsyncResultオブジェクトが入る。

[code lang=“python”] print res <AsyncResult: c50cc259-ab02-46be-8ec6-e5c63ce4e1e2> [/code]

タスクが完了しているかどうかはsuccessful()メソッドを使う。完了していなければFalseが返る。 [code lang=“python”] print res.successful() False [/code]

5秒後に再度実行するとTrueが返る、はず。 [code lang=“python”] print res.successful() True [/code]

値はget()メソッドを使う。 [code lang=“python”] print res.get() ‘hogehoge’ [/code]

ちょっと駆け足でしたがPython Hack-a-thon #2で熱いネタだったのでとりあえず公開。突っ込み歓迎。

もしmultiprocessing周りでエラーが出たら、以下のURLのパッチを適用する。 http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/

is_done()なくなるよ!と指摘をいただいたので修正した。(2009/11/15 1:09) [ad#text_wide]