# PythonでCloud Pub/Subのトピックとサブスクリプションを作成し、同期、非同期でpullする

Cloud Pub/Subのエミュレーターを使ってローカル環境でPythonから操作する。
なお、エミュレータはgcloud pubsubコマンドに対応していない
そのため、ローカルでPub/Subを確認するには必ずコードを書かなければならない。

# pubsub-emulatorをインストールする

gcloud components listpubsub-emulatorがインストール済みか確認する。

$ gcloud components list
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                  Components                                                 │
├───────────────┬──────────────────────────────────────────────────────┬──────────────────────────┬───────────┤
│     Status    │                         Name                         │            ID            │    Size   │
├───────────────┼──────────────────────────────────────────────────────┼──────────────────────────┼───────────┤
...
│ Not Installed │ Cloud Pub/Sub Emulator                               │ pubsub-emulator          │  34.8 MiB │
...
└───────────────┴──────────────────────────────────────────────────────┴──────────────────────────┴───────────┘

インストール済みでない場合、次のコマンドでpubsub-emulatorをインストールする。

$ gcloud components install pubsub-emulator
$ gcloud components update

アプリケーションがCloud Pub/Sub ではなくエミュレータに接続するように環境変数を設定する。
$(gcloud beta emulators pubsub env-init)PUBSUB_EMULATOR_HOSTという環境変数が設定される。
なお、エミュレータを起動するたびに、環境変数を設定する必要がある。

$ $(gcloud beta emulators pubsub env-init)
$ echo ${PUBSUB_EMULATOR_HOST}
localhost:8085

# PythonからCloud Pub/Subのエミュレーターを操作する

次の手順でCloud Pub/Subのエミュレーターを操作できる環境を作成する。

# Pythonのgoogle-cloud-pubsubライブラリをpipenvでインストールする

pipenvのインストールと使い方はこちらの記事 MacでPythonが動く環境を構築して、Flaskから文字列を返すところまでに記載している。

作業するディレクトリに空のPipfileを用意し、Python3.7が動く仮想環境を作成する。

$ touch Pipfile
$ pipenv install --python 3.7.3

Cloud Pub/SubのPythonライブラリをインストールする。

$ pipenv install google-cloud-pubsub==0.41.0

生成されたPipfileファイルを確認すると、たしかにgoogle-cloud-pubsubが入っている。

[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
google-cloud-pubsub = "==0.41.0"

[requires]
python_version = "3.7"

# トピックの作成

google.cloudモジュールからpubsubオブジェクトをインポートする。
pubsubオブジェクトからPublisherClientオブジェクトを取得する。
PublisherClientオブジェクトのtopic_pathメソッドによりトピックのパスを作成する。
topic_pathメソッドの第一引数はプロジェクトID、第二引数はトピック名を指定する。
そして、PublisherClientオブジェクトのcreate_topicメソッドによりトピックを作成する。

create_topic.pyを作成する。

from google.cloud import pubsub

project_id = 'using-pub-sub-emulator'
topic_name = 'my_topic'
client = pubsub.PublisherClient()
topic_path = client.topic_path(project_id, topic_name)
response = client.create_topic(topic_path)
print('Topic created: {}'.format(response))
$ pipenv run python create_topic.py
Topic created: name: "projects/using-pub-sub-emulator/topics/my_topic"

トピックの一覧を取得するコードで、トピックが作成できたか確認する。
PublisherClientオブジェクトのproject_pathメソッドでプロジェクトのパスを作成する。
project_pathメソッドの第一引数にはプロジェクトIDを指定する。projects/using-pub-sub-emulatorのような値が返ってくる。
PublisherClientオブジェクトのlist_topicsメソッドでトピックの一覧を取得する。 list_topicsメソッドの第一引数はプロジェクトのパスを指定する。

confirm_topic.py

from google.cloud import pubsub

project_id = 'using-pub-sub-emulator'
client = pubsub.PublisherClient()
project = client.project_path(project_id)
for element in client.list_topics(project):
  print(element)

confirm_topic.pyを実行するとトピックのパスが確認できる。

$ pipenv run python confirm-topic.py
name: "projects/using-pub-sub-emulator/topics/my_topic"

# サブスクリプションの作成

SubscriberClientオブジェクトのsubscription_pathメソッドでサブスクリプションのパスを作成する。
project_pathメソッドの第一引数にはプロジェクトID、第二引数にはサブスクリプション名を指定する。projects/using-pub-sub-emulator/subscriptions/my_subscriptionのような値が返ってくる。
SubscriberClientオブジェクトのcreate_subscriptionメソッドでサブスクリプションを作成する。
create_subscriptionメソッドの第一引数にはサブスクリプションのパス、第二引数にはトピックのパスを指定する。

create_subscription.py

from google.cloud import pubsub

project_id = 'using-pub-sub-emulator'
topic_name = 'my_topic'
subscription_name = 'my_subscription'

publisher_client = pubsub.PublisherClient()
subscriber_client = pubsub.SubscriberClient()

topic_path = publisher_client.topic_path(project_id, topic_name)
subscription_path = subscriber_client.subscription_path(
    project_id, subscription_name)

response = subscriber_client.create_subscription(subscription_path, topic_path)
print('Subscription created: {}'.format(response))

confirm_topic.pyを実行するとサブスクリプションのパスが確認できる。

$ pipenv run python create_subscription.py
Subscription created: name: "projects/using-pub-sub-emulator/subscriptions/my_subscription"
topic: "projects/using-pub-sub-emulator/topics/my_topic"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}

# パブリッシャーはメッセージをトピックに登録する

PublisherClientオブジェクトのpublishメソッドでメッセージをパブリッシュする。
publishメソッドの第一引数はトピックのパス、キーワード引数dataにはメッセージのデータを指定する。
メッセージのデータはUTF-8でエンコードしてバイナリにしておく。(data.encode('utf-8'))

publish-message.py

from google.cloud import pubsub
import random

project_id = 'using-pub-sub-emulator'
topic_name = 'my_topic'
publisher = pubsub.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
  data = 'メッセージです ランダムな数字 {}'.format(random.random())
  data = data.encode('utf-8')
  future = publisher.publish(topic_path, data=data)
  print('Published {} of message ID {}.'.format(data.decode(), future.result()))

publish-message.pyを実行すると10個のメッセージがパブリッシュされる。

$ pipenv run python publish-message.py
Published メッセージです ランダムな数字 0.4473929581709073 of message ID 39.
Published メッセージです ランダムな数字 0.3743428720261527 of message ID 40.
Published メッセージです ランダムな数字 0.8932411308402567 of message ID 41.
Published メッセージです ランダムな数字 0.9453641446165716 of message ID 42.
Published メッセージです ランダムな数字 0.03304974995298049 of message ID 43.
Published メッセージです ランダムな数字 0.5652751880659433 of message ID 44.
Published メッセージです ランダムな数字 0.9222733287467317 of message ID 45.
Published メッセージです ランダムな数字 0.9817410397902789 of message ID 46.
Published メッセージです ランダムな数字 0.4486627053292169 of message ID 47.

# サブスクライバーはメッセージをサブスクリプションからpullで受け取る

メッセージを受け取る方法は同期、非同期の2種類がある。
まずは同期的にpullする方法を見て、その次に非同期的にpullする方法を確認する。

# 同期でpullする

メッセージを同期にpullで受け取る。
SubscriberClientオブジェクトのpullメソッドでメッセージを受け取る。
pullメソッドの第一引数はサブスクリプションのパス、キーワード引数max_messagesはこのリクエストで返されるメッセージの最大数を指定する。 戻り値はPullResponseオブジェクトで、received_messages`属性に受け取ったメッセージの配列が格納されている。メッセージがない場合は空の配列が返される。

各メッセージは次のようにack_idとmessageが格納されており、メッセージのデータはmessage.dataから取得できる。

ack_id: "projects/using-pub-sub-emulator/subscriptions/my_subscription:11290"
message {
  data: "\343\203\241\343\203\203\343\202\273\343\203\274\343\202\270\343\201\247\343\201\231 \343\203\251\343\203\263\343\203\200\343\203\240\343\201\252\346\225\260\345\255\227 0.06807183426239971"
  message_id: "60"
  publish_time {
    seconds: 1559045847
  }
}

そして、SubscriberClientオブジェクトのacknowledgeメソッドでメッセージの応答を返す。
acknowledgeメソッド第一引数はサブスクリプションのパス、第二引数はack_idの配列を返す。

synchronous-pull.py

from google.cloud import pubsub

project_id = 'using-pub-sub-emulator'
subscription_name = 'my_subscription'

subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

NUM_MESSAGES = 3

response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

ack_ids = []
for received_message in response.received_messages:
    print("Received: {}".format(received_message.message.data.decode()))
    ack_ids.append(received_message.ack_id)

subscriber.acknowledge(subscription_path, ack_ids)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))

synchronous-pull.pyを実行すると、NUM_MESSAGESで指定した数のメッセージを受け取ることができている。

$ pipenv run python synchronous-pull.py
Received: メッセージです ランダムな数字 0.06807183426239971
Received: メッセージです ランダムな数字 0.9155834358933995
Received: メッセージです ランダムな数字 0.5783843824115955
Received and acknowledged 3 messages. Done.

# 非同期でpullする

メッセージを非同期にpullで受け取る。
バックグラウンドで非同期にメッセージを処理できるようにするために、メインスレッドが終了しないようにする。while True: time.sleep(60)
SubscriberClientオブジェクトのsubscribeメソッドでメッセージを受け取る。
subscribeメソッドの第一引数はサブスクリプションのパス、キーワード引数callbackはメッセージを受け取った際のコールバック関数を指定する。
コールバック関数の引数はMessage (opens new window)オブジェクトを受け取る。data属性にメッセージのデータが格納されており、ackメソッドでメッセージを受け取ったことを伝え、再度メッセージがこないようにする。

async-pull.py

import time

from google.cloud import pubsub

project_id = 'using-pub-sub-emulator'
subscription_name = 'my_subscription'

subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)


def callback(message):
    print('Received message: {}'.format(message.data.decode()))
    message.ack()


subscriber.subscribe(subscription_path, callback=callback)

print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

# 関連記事

Cloud Pub/Subの配信タイプpullのサブスクリプションをgcloudツールで試す

# 参考

https://cloud.google.com/pubsub/docs/emulator (opens new window)
https://googleapis.github.io/google-cloud-python/latest/pubsub/ (opens new window)
https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/pubsub/cloud-client/publisher.py (opens new window)
https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/pubsub/cloud-client/subscriber.py (opens new window)