時間のかかる処理をタスクキューで分割する


GoogleAppEngineでは、時間のかかる処理を分割して平行実行するためにTaskQueue(タスクキュー)機能が用意されています。今回はタスクキューを利用した並列化を紹介します。

他のWebサービスとの連携など負荷制御が必要なケースや、実行時まで処理時間がわからないタスクのために利用すると良いでしょう。

たとえばTwitterへのtweet投稿などでは連続動作させると意図せず、相手サーバーへのDoS攻撃になってしまいます。また登録されているメールアドレス全てにメールを送信する、など
データベースの登録状況に依存する処理の場合、だんだんと処理時間が伸びてしまうため並列化が必要になります。

タスクキューを利用する方法はとても簡単です。サンプルコードは続きから。

Queueを定義する

タスクキューに登録するQueueは、queue.xmlで実行間隔を制御できます。queue.xmlを作り、キューの設定を確認してみましょう。
■war/WEB-INF/queue.xml

<queue-entries>
  <queue>
    <name>process</name>
    <rate>2/m</rate>
    <bucket-size>4</bucket-size>
  </queue>
</queue-entries>

4行目のrate要素はタスクを処理する速度です。[2/m]は1分間に2つのキューが処理できます。
5行目のバケットサイズは同時に実行できるタスクの最大値です。ここでは4つ以上のタスクが同時に実行されないように設定しています。
タスクキューの設定は処理に応じて最適な値を探して見てください(処理内容ごとに適切な値が異なるため、一律にこうしておけばよい、という答えがありません)。
また、queue-entries要素は複数のqueue要素を持つことが可能です。

タスクキューの概念

以下の図はキュー到着とバケットサイズの関係を表したものです。
タスクキューに、処理が必要なタスクが追加されたケースを考えてみましょう。バケットに十分が余裕がある時に2つのタスクが到着すると待ち時間なしで実行でき、代わりにバケットは3から1へと少なくなります。

バケットが1のときに2つのタスクが到着した場合、バケットが回復するまで後から到着したタスクが実行されないことになります。

タスクキューのrate

rate要素はタスクキューの同時実行できる最大値です。rateが[2/m]の場合、実行はどんなに早くても1分に2回までに制限されます。
タスクキューは無制限に溜まるのではなく、図のようにbucket-sizeによって上限を設定することが出来ます。

bucket-size要素=待ち時間(wait)なしに実行できる最大タスク数ということに注意して負荷を確認してください。
bucket-sizeを超えるタスクが到着した場合、待ち状態のタスクとして実行が保留されます。その後、バケットが追加されたタイミング([2/m]では1分につき2つの速度、10個保留されていれば、消化にかかる時間は5分)でタスクは消化されます。

Queueを実行する

タスクキューの定義が完了したら、実際にキューにタスクを登録してみましょう。
次のソースコードでは異なる方法で2つのタスクを作成してキューに登録しています。
(タスクとしてURL “/taskqueue” に割り当てられたサーブレットを実行しています)。
■src/MainServlet.java

@SuppressWarnings("serial")
public class MainServlet extends HttpServlet {
	public void doGet(HttpServletRequest req, HttpServletResponse resp)
			throws IOException {

		Queue queue = QueueFactory.getDefaultQueue();
		queue.add(TaskOptions.Builder.withUrl("/taskqueue"));

		//詳細を指定してPost
		queue = QueueFactory.getQueue("process");
		TaskOptions to = TaskOptions.Builder.withUrl("/taskqueue").param("name", "value");
		queue.add(to.method(Method.POST));
	}
}

6行目:QueueFactoryクラスのgetDefaultQueueメソッドを使ってデフォルトのqueueを作成します(queue.xmlが無い場合はGAEのdefault設定[1秒につき5タスク]で動作します)
10行目:タスクキューを指定する場合は、QueueFactoryクラスのgetQueueメソッドで名前(ここではprocess)を指定してキューを取得します(queue.xmlで定義した名前です)
11行目:タスクを呼び出すインスタンスを生成します。パラメータ等は自由に設定できます。
12行目:タスクキューに処理が登録され、非同期に実行されます。サンプルではPOSTメソッドを利用しており、これはタスクキューを使う際の標準設定です(11行目で設定したパラメータも引き渡されます)。

以上、お疲れ様でした。

One Comment