Task Queue REST APIを使って、GCEからPullQueueをさわってみる!

  • このエントリーをはてなブックマークに追加

Google App Engine(GAE)を利用する際、非同期処理にTaskQueueを使われている方は多いのではないでしょうか?GAEのTaskQueueには、GAEでタスク処理をするPushQueueの他に、GAE以外からも処理ができるPull Queueという種類があります。今回はそのPullQueueについて、GAEでタスク登録をして、GCEからタスクを確認し削除するところまでをご紹介します。

GAEのTask Queue は2種類ある!

GAEにてアプリケーションの処理(タスク)を Task Queue API に渡すことでユーザーからのリクエスト外で非同期に処理させることができる機能です。HTTPリクエスト経由でジョブ実行する仕組みで、ジョブはWebアプリケーションの特定のURLに対してHTTPリクエストを発行することで、ジョブの実行を行います。また、キューの処理は非同期で行われます。

GAEではリクエストを60秒以内に処理しないといけない、という特有のルールがありますが、その対策としても非同期で処理できるTask Queue は非常に有効です。Task Queue には、Push Queue とPull Queue の2種類があります。

Push Queue はGAEで実行する

Push Queue は、HTTP リクエストを GAEに送信して、タスクを実行します。
タスクには実行期限があり、スケーリングタイプに応じて期限が異なります。自動スケーリング サービスは、10 分以内に完了する必要があります。push タスク リクエストが 200~299 以外の HTTP ステータス コードを返した場合、またはタスクの期限までにレスポンスを返すことができなかった場合、GAEは成功するまでタスクを再試行します。確実な一定の頻度でリクエストをディスパッチします。このキューでは、確実なタスクの実行が保証されます。

Pull Queue はGAE以外でも実行できる

PullQueueはGAEだけではなく、外部のサービスが Queueからタスクを取得(リース)して処理することが出来ます。

画像処理など、GAEでは処理が難しいタスクに対して使うことが多いかと思います。GAE以外のサービスでは、Task Queue REST APIを利用してタスクをpullします。
また、PullQueueでは「タグ機能」が使用できます。タスクにタグを付けて、外部サービスが同じタグがついているタスクをまとめて処理することができる、とても便利な機能です。
(※現在2017年7月時点ではtag機能はベータ版です。)

Push Queue Pull Queue
処理するサービス(consumer) GAE GAE、GAE以外のサービス
タスクの登録をするサービス(producer) GAE GAE
1タスクの処理制限時間 10分以内(※1) 事前に指定したリース期間以内
タスクのリース期間(処理するまでの期間) 特に制限無し。 制限あり(最大1週間)。リース期間内に取得した全てのタスク処理をする必要がある。(出来なかった場合、再度リースが可能。)
Queueの定義

(queue.xml構成)

<mode>の追加は必要ない。 <mode> pull </ mode>指示文を追加する必要がある。
GCPのコンソールからできる操作 キューの一時停止、削除(キュー自体の削除)、消去(キュー内のすべての保留中のタスクを削除)ができる。 PushQueueと同じ。
タスクの再試行 GAEがタスクが成功するまでタスクを再試行する。(上限回数は設定可能) consumer(※2)側で制御する必要がある。
タスクの削除 タスク処理後、200番台のレスポンスが返ってくると、GAEで

Queueが削除される。

タスク処理後、タスクを明示的にAPIを呼んで削除する必要がある。(削除しない場合は、別のconsumer (※2) が再度タスクを処理する場合がある。)
タスクのディスパッチ する。 しない。
タグ機能 ない。 ある(バッチ処理に効果的)。
consumer(※2)のリソース管理 自動でスケールするので、必要ない。 タスクの処理量に基づいて調整する必要がある。

※1.自動スケーリングの場合。それ以外の場合は24 時間以内。
※2.タスクを処理する側のサービスの総称。上のイメージ図をご参考ください。

Pull Queue の使い方

では、実際にGAEにてPullQueueにjson形式のパラメータをタスクとして登録し、GCEからタスクを取得し、内容の確認、削除まで試してみます。ざっくりした流れは以下です。

  1. GCPプロジェクトの作成
    (今回の記事では省略します。こちらをご参考ください。)
  2. GAEでの事前準備
  3. GAEでタスクとしてjson形式のパラメータを登録
    GCEのインスタンス名をタグで指定し、デプロイをします。
    (GAE/Java で実装します。)
  4. キューからタスクを取得するためのアクセストークンの取得
  5. GCEから、自身のインスタンス名がタグつけされているタスクを取得(リース)
  6. パラメータ内容を確認
  7. GCEにて、タスクを削除

1.GAEでの事前準備

PullQueueを定義する

queue.xmlを作成し、以下のように設定します。

<queue-entries>
  <queue>
    <name>testQueue</name>
    <mode>pull</mode>
 <acl>
      <user-email>test@gmail.com</user-email>
      <writer-email>test@gmail.com</writer-email>
      <writer-email>test2@gmail.com</writer-email>
    </acl>
  </queue>
</queue-entries>

今回は、「testQueue」というPull Queueを作成します。PullQueueであることを指定するために、<mode>pull</mode>を追加しました。
(PushQueueの場合はmodelの指定は不要。)

また、Task Queue REST API を使用する場合は、acl ディレクティブを使用してアクセス制御リストを作成する必要があります。このディレクティブを使用するとで、アクセスを限定できます。acl要素には以下の2つのパラメータがあります。

  • user_email: ユーザーがタスクをリスト、取得、リース、削除、更新できる
  • writer_umail: ユーザーがタスクを挿入できる

デベロッパーが API のすべての機能にアクセスするには、デベロッパーのメールアドレスを user_email と writer_email の両方に指定する必要があります。(上の場合、test@gmail.comが全てのAPI呼び出しにアクセスできます。)

web.xmlを設定する

TestPullQueueServletをブラウザから実行できるように、web.xml は以下のように設定します。

< xml version="1.0" encoding="utf-8" >
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5">
<servlet>
<servlet-name>TestPullQueue</servlet-name>
<servlet-class>com.yoshidumi.testpullqueue.TestPullQueueServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>TestPullQueue</servlet-name>
<url-pattern>/testpullqueue</url-pattern>
</servlet-mapping>
<welcome-file-list>
<welcome-file>index.html</welcome-file>
</welcome-file-list>
</web-app>

2.GAEでタスクとしてjson形式のパラメータを登録

タスクをPullQueueに追加する

queue.xmlに定義されているキュー名を使用してキューを取得し、TaskOptions.Method.PULLを使ってタスクを追加します。

package com.yoshidumi.testpullqueue;

import java.io.IOException;
import java.util.HashMap;
import java.util.logging.Logger;

import javax.servlet.http.*;

import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.repackaged.com.google.gson.Gson;

@SuppressWarnings("serial")
public class TestPullQueueServlet extends HttpServlet {

	private static final Logger LOGGER = Logger
			.getLogger(TestPullQueueServlet.class.getName());

	@Override
	public void doGet(HttpServletRequest req, HttpServletResponse resp)
			throws IOException {
		doPost(req, resp);
	}

	@Override
	public void doPost(HttpServletRequest req, HttpServletResponse resp)
			throws IOException {

		// タスクとして登録するパラメータ作成。
		HashMap<String,String> map = new HashMap<>();
		map.put("name", "yamauchi");
		map.put("gceName", "pullqueue-test");
		map.put("gceZone", "us-central1-c");

          // queue.xmlに定義されているキュー名を使用してキューを取得
		Queue queue = QueueFactory.getQueue("testQueue");
		String json = new Gson().toJson(map);
		String gceName = (String) map.get("gceName");
       
          // TaskOptions.Method.PULLを使ってタスクを追加し、タグ付けもする。
		queue.add(TaskOptions.Builder.withMethod(TaskOptions.Method.PULL)
				.payload(json.getBytes("UTF-8")).tag(gceName));
	}
}

今回はユーザーとGCEの情報をjson形式でパラメータとしてタスクに登録しています。
また、タグはタスクを取得する際に、フィルタとして機能します。GCEインスタンス名をタグで指定しました。
※このタグ機能は現時点ではベータ版です。(SLAの対象外です。)

Pull Queue にタスクを登録する

デプロイ後、ブラウザで以下URLを入力します。
http://{GCPのアプリケーションID}.appspot.com/testpullqueue

コンソール画面から Pull Queue のタスクを確認する

タスク登録後、コンソール画面に移動します。
コンソール画面URL : https://console.cloud.google.com
メニューのApp Engine→タスクキューに移動し、 「pull キュー」を選択します。
以下のように、登録したキュー名のタスクができていれば、Pull Queue のタスク登録は完了です。

登録したパラメータも確認しておきましょう。
キュー名をクリックして、キューの詳細画面に移動し、タスク名を選択します。

3.キューからタスクを取得するためのアクセストークンの取得

GCEからタスクを探しに行くのに、GoogleAPIを呼び出す準備をする必要があります。

OAuth クライアント ID の作成

GCPコンソール画面から、API Manager→認証情報画面に移動し、
「認証情報の作成」→「OAuth クライアント ID」を選択します。

ウェブアプリケーションを選択し、名前、リダイレクトURLに任意のものを入れてください。
今回は、承認済みのリダイレクトURIにはhttps://www.google.co.jp/を入力しました。

作成ボタンをクリック後、client_idとclient_secretが出てくるので、メモします。
後ほど、アクセストークンを要求する際に使います。

以下のように作成できればOAuthクライアントIDの完成です!
赤枠で囲ったクライアントIDを使って、認証コードを取得します。

認証コードを取得する

ブラウザで以下にアクセスします。
{クライアントID}の部分に先程取得したクライアントIDを入れます。
scopeに複数の値を設定する場合は「%20」で連結させます。

https://accounts.google.com/o/oauth3/v2/auth response_type=code
&client_id={クライアントID}
&redirect_uri={リダイレクトURI}
&scope=https://www.googleapis.com/auth/taskqueue%20https://www.googleapis.com/auth/taskqueue.consumer%20https://www.googleapis.com/auth/cloud-taskqueue%20https://www.googleapis.com/auth/cloud-taskqueue.consumer&access_type=offline

以下のようなURLにリダイレクトされるので認証コードをメモします。
https://www.google.co.jp/ code={認証コード}#
(「https://www.google.co.jp」の部分はリダイレクトURLに依存します。)

アクセストークンを要求する

GCEを起ち上げ、SSH接続します。
以下のcurlコマンドで、アクセストークンを要求します。
{認証コード}、{client_id}、{client_secret}にそれぞれ先程メモした値を入れます。

curl -d “code={認証コード}” -d “client_id={client_id}” -d “client_secret={client_secret}” -d
“grant_type=authorization_code” -d “access_type=offline” -d
“redirect_uri=https://www.google.co.jp/” https://www.googleapis.com/oauth3/v4/token

返ってきた、以下のjsonからアクセストークンを取得できます。
(”expires_in”はアクセストークンの有効期間(秒)です。)

{
“access_token”:”{アクセストークン}”,
“token_type”: “Bearer”,
“expires_in”: 3600,
“refresh_token”: “{リフレッシュトークン}”
}

以上でアクセストークンの取得は完了です!

4.GCEから、自身のインスタンス名がタグつけされているタスクを取得(リース)

取得したアクセストークンを使って、Task Queue REST APIをたたきます。
その前に、Task Queue REST APIで用意されている各メソッドの概要をざっとご紹介しましょう。今回使ったのはCloud Tasks* のメソッドです。

メソッド 概要
delete TaskQueueからタスクを削除します。
get TaskQueue内の指定されたタスクを1つ取得します。
insert タスクを既存のキューに挿入します。
lease タスクを取得(リース)します。リースするタスクの数(最大 1,000 個)を指定する必要があり、指定した数のタスクが古い順に返されます。(タグも指定ができます。)
list TaskQueue内の削除されていないすべてのタスクを、現在リースされているかどうかにかかわらず、最大100までリストします。(タグの指定は出来ません。)
patch TaskQueueからリースされたタスクを更新します。
update タスクリースの期間を更新します。

* 現時点ではCloud Tasks は、Task Queue REST APIのアルファーリリースの機能です。

・タスクの取得
タスクの取得にはleaseとlistと言う2つのメソッドが用意されてますが、listメソッドはリースされたものも含めて一括して全部取得する際に利用するもので、今回はリース目的なので普通にleaseメソッドを利用します。

leaseメソッドのパラメータ

leaseメソッドのパラメータについて簡単に紹介します。
HTTP request

POST https://www.googleapis.com/taskqueue/v1beta2/projects/project名/taskqueues/taskqueue名/tasks/lease

以下6つのパラメータが利用出来ます。
・Pathのパラメータ
  project :プロジェクト名
  taskqueue :キュー名
・必須のクエリパラメータ
  leaseSecs :リースする期間(秒単位)
  numTasks :リースするタスク数
・オプションのクエリパラメータ(タグを指定したい場合に利用します。)
  groupByTag :trueの場合、指定した同じタグのタスクを返します。
  tag :groupByTagがtrueの場合にのみ、タグを指定します。

今回はtagを指定しているので、オプションのクエリパラメータも使いました。
アクセストークンを使って、タグ「pullqueue-test」で指定しているタスクを1つリースします。

curl -H “Authorization: Bearer
ya29.GlyUBDGHvXtBgv27E-PCMJ5azVXWmulSSyHiAEcdA5Tsesm1ZItIX10jgomgCwUsQc_IOIA5Vixy6Iy4c9NzX4GCmDwIMvX-Zb0qCCXQWLJo6y6nUq9Ewbwap5Jm7g” -X POST
https://www.googleapis.com/taskqueue/v1beta2/projects/apps-gcp-test/taskqueues/testQueue/tasks/lease -d “groupByTag=true” -d “leaseSecs=3600” -d “numTasks=1” -d “tag=pullqueue-test”

リースの結果として、指定したタグのタスクが1つ返されました。

{ 
"kind": "taskqueue#tasks",
 "items": [
  {
   "kind": "taskqueues#task",
   "id": "3936773102613155867",
   "queueName": "projects/s~apps-gcp-test/taskqueues/testQueue",
"payloadBase64":"eyJuYW1lIjoieWFtYXVjaGkiLCJnY2VOYW1lIjoicHVsbHF1ZXVlLXRlc3QiLCJnY2Vab25lIjoidXMtY2VudHJhbDEtYyJ9",
   "enqueueTimestamp": "1501053065000000",
   "leaseTimestamp": "1501057395108276",
   "retry_count": 0,
   "tag": "pullqueue-test"
  }
 ]
}

返却値の項目の意味はこのようになっています。

{
 "kind": 返されるオブジェクトの種類、タスクのリスト,
 "items": [ ←タスクの実際のリスト  
  {
    "kind": 返されるオブジェクトの種類,
    "id":タスクの名前, ←タスクを削除する際にも利用します。
    "queueName": タスクが存在するキューの名前,
    "payloadBase64":実行するタスク(これはbase64でエンコードされた文字列です),
    "enqueueTimestamp": タスクがエンキューされた時刻,
    "leaseTimestamp": タスクのリースが期限切れになる時間,
    "retry_count": このタスクに既に適用されたリースの数,
    "tag": タスクのタグ  
  }
 ]
}

5.パラメータ内容を確認

payloadBase64の値は、Base64でエンコードされているため、下記のコマンドを使ってデコードします。

echo ‘Base64エンコードされた文字列’ | openssl enc -d -base64

実際にデコードしてみました。

echo
‘eyJuYW1lIjoieWFtYXVjaGkiLCJnY2VOYW1lIjoicHVsbHF1ZXVlLXRlc3QiLCJnY2Vab25lIjoidXMtY2VudHJhbDEtYyJ9’ | openssl enc -d -base64

コンソール画面と同じパラメータが確認できました!

{“name”:”yamauchi”,”gceName”:”pullqueue-test”,”gceZone”:”us-central1-c”}

6.GCEにて、タスクを削除

内容を確認できたので、再度アクセストークンを使い、GCE側からタスクを削除します。
deleteメソッドを利用します。
HTTP request

DELETE /project名/taskqueues/taskqueue名/tasks/taskID

taskIDは、lease結果で返ってきた”id”の値を使います。

実際にたたいてみた内容です。

curl -H “Authorization: Bearer
ya29.GlyUBDGHvXtBgv27E-PCMJ5azVXWmulSSyHiAEcdA5Tsesm1ZItIX10jgomgCwUsQc_IOIA5Vixy6Iy4c9NzX4GCmDwIMvX-Zb0qCCXQWLJo6y6nUq9Ewbwap5Jm7g” -X DELETE
https://content.googleapis.com/taskqueue/v1beta2/projects/s~apps-gcp-test/taskqueues/testQueue/tasks/3936773102613155867

GCPコンソール画面のpullキューから指定したタスク名のタスクが削除出来ているのを確認できました!

おわりに

いかがでしがでしょうか?
今回、GCEからPullQueueを触ろうとした際に、ドキュメントが少なかったので記事にしてみました。
GAEはとても便利ですが、画像処理など対応出来ないこともあります。そんな時にGCEや他のサービスからタスク処理ができれば更に快適にGAEを利用できるようになると思います!
ぜひ試してみてください。

  • このエントリーをはてなブックマークに追加

Google のクラウドサービスについてもっと詳しく知りたい、直接話が聞いてみたいという方のために、クラウドエースでは無料相談会を実施しております。お申し込みは下記ボタンより承っておりますので、この機会にぜひ弊社をご利用いただければと思います。

無料相談会のお申込みはこちら