Using the API#
Authentication / Configuration#
Use
Client
objects to configure your applications.In addition to any authentication configuration, you should also set the
GOOGLE_CLOUD_PROJECT
environment variable for the project you’d like to interact with. If you are Google App Engine or Google Compute Engine this will be detected automatically.The library now enables the
gRPC
transport for the pubsub API by default, assuming that the required dependencies are installed and importable. To disable this transport, set theGOOGLE_CLOUD_DISABLE_GRPC
environment variable to a non-empty string, e.g.:$ export GOOGLE_CLOUD_DISABLE_GRPC=true
.Client
objects hold both aproject
and an authenticated connection to the PubSub service.The authentication credentials can be implicitly determined from the environment or directly via
from_service_account_json
andfrom_service_account_p12
.After setting
GOOGLE_APPLICATION_CREDENTIALS
andGOOGLE_CLOUD_PROJECT
environment variables, create aClient
>>> from google.cloud import pubsub >>> client = pubsub.Client()
Manage topics for a project#
List topics for the default project:
topics, token = client.list_topics() # API request
while True:
for topic in topics:
do_something_with(topic)
if token is None:
break
topics, token = client.list_topics(page_token=token) # API request
Create a new topic for the default project:
topic = client.topic(TOPIC_NAME)
topic.create() # API request
Check for the existence of a topic:
assert not topic.exists() # API request
topic.create() # API request
assert topic.exists() # API request
Delete a topic:
assert topic.exists() # API request
topic.delete()
assert not topic.exists() # API request
Fetch the IAM policy for a topic:
policy = topic.get_iam_policy() # API request
Update the IAM policy for a topic:
ALL_USERS = policy.all_users()
policy.viewers.add(ALL_USERS)
LOGS_GROUP = policy.group('cloud-logs@google.com')
policy.editors.add(LOGS_GROUP)
new_policy = topic.set_iam_policy(policy) # API request
Test permissions allowed by the current IAM policy on a topic:
from google.cloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
TO_CHECK = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE]
ALLOWED = topic.check_iam_permissions(TO_CHECK)
assert set(ALLOWED) == set(TO_CHECK)
Publish messages to a topic#
Publish a single message to a topic, without attributes:
topic.publish(b'This is the message payload') # API request
Publish a single message to a topic, with attributes:
topic.publish(b'Another message payload', extra='EXTRA') # API request
Publish a set of messages to a topic (as a single request):
with topic.batch() as batch:
batch.publish(PAYLOAD1)
batch.publish(PAYLOAD2, extra=EXTRA)
Note
The only API request happens during the __exit__()
of the topic
used as a context manager, and only if the block exits without raising
an exception.
Manage subscriptions to topics#
List all subscriptions for the default project:
subscriptions, token = client.list_subscriptions() # API request
while True:
for subscription in subscriptions:
do_something_with(subscription)
if token is None:
break
subscriptions, token = client.list_subscriptions(
page_token=token) # API request
List subscriptions for a topic:
subscriptions, token = topic.list_subscriptions() # API request
while True:
for subscription in subscriptions:
do_something_with(subscription)
if token is None:
break
subscriptions, token = topic.list_subscriptions(
page_token=token) # API request
Create a new pull subscription for a topic, with defaults:
sub_defaults = topic.subscription(SUB_DEFAULTS)
Create a new pull subscription for a topic with a non-default ACK deadline:
sub_ack90 = topic.subscription(SUB_ACK90, ack_deadline=90)
Create a new push subscription for a topic:
subscription = topic.subscription(SUB_PUSH, push_endpoint=PUSH_URL)
subscription.create() # API request
Check for the existence of a subscription:
assert subscription.exists() # API request
Convert a pull subscription to push:
subscription.modify_push_configuration(
push_endpoint=PUSH_URL) # API request
Convert a push subscription to pull:
subscription.modify_push_configuration(push_endpoint=None) # API request
Re-synchronize a subscription with the back-end:
subscription.reload() # API request
Fetch the IAM policy for a subscription
policy = subscription.get_iam_policy() # API request
Update the IAM policy for a subscription:
ALL_USERS = policy.all_users()
policy.viewers.add(ALL_USERS)
LOGS_GROUP = policy.group('cloud-logs@google.com')
policy.editors.add(LOGS_GROUP)
new_policy = subscription.set_iam_policy(policy) # API request
Test permissions allowed by the current IAM policy on a subscription:
from google.cloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
TO_CHECK = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE]
ALLOWED = subscription.check_iam_permissions(TO_CHECK)
assert set(ALLOWED) == set(TO_CHECK)
Delete a subscription:
subscription.delete() # API request
Pull messages from a subscription#
Fetch pending messages for a pull subscription:
pulled = subscription.pull(max_messages=2)
Note that received messages must be acknowledged, or else the back-end will re-send them later:
for ack_id, message in pulled:
try:
do_something_with(message)
except ApplicationException as e:
log_exception(e)
else:
subscription.acknowledge([ack_id])
Fetch messages for a pull subscription without blocking (none pending):
pulled = subscription.pull(return_immediately=True)
Update the acknowlegement deadline for pulled messages:
for ack_id, _ in pulled:
subscription.modify_ack_deadline(ack_id, 90) # API request
Fetch pending messages, acknowledging those whose processing doesn’t raise an error:
from google.cloud.pubsub.subscription import AutoAck
with AutoAck(subscription, max_messages=10) as ack:
for ack_id, message in list(ack.items()):
try:
do_something_with(message)
except Exception: # pylint: disable=broad-except
del ack[ack_id]
Note
The pull
API request occurs at entry to the with
block, and the
acknowlege
API request occurs at the end, passing only the ack_ids
which haven’t been deleted from ack