Skip to content

Commit

Permalink
Document different PuSub received message types
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Jun 24, 2019
1 parent 09e36cd commit e1b1daa
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pubsub/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ block the current thread until a given condition obtains:
except KeyboardInterrupt:
future.cancel()
To learn more, consult the `subscriber documentation`_.
It is also possible to pull messages in a synchronous (blocking) fashion. To
learn more, consult the `subscriber documentation`_.

.. _subscriber documentation: https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/index.html
53 changes: 40 additions & 13 deletions pubsub/docs/subscriber/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,48 @@ to subscribe to, and it must already exist. Once you have that, it is easy:
sub_name = 'projects/{project}/subscriptions/{subscription}'
subscriber.create_subscription(sub_name, topic_name)
Once you have created a subscription (or if you already had one), the next
step is to pull data from it.

Pulling a Subscription
----------------------

Once you have created a subscription (or if you already had one), the next
step is to pull data from it. The subscriber client uses the
Pulling a Subscription Synchronously
------------------------------------

To pull the messages synchronously, use the client's
:meth:`~.pubsub_v1.subscriber.client.Client.pull` method.

.. code-block:: python
# Substitute PROJECT and SUBSCRIPTION with appropriate values for your
# application.
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
response = subscriber.pull(subscription_path, max_messages=5)
for msg in response.received_messages:
print("Received message:", msg.message.data)
ack_ids = [msg.ack_id for msg in response.received_messages]
subscriber.acknowledge(subscription_path, ack_ids)
The method returns a :class:`~.pubsub_v1.types.PullResponse` instance that
cointains a list of received :class:`~.pubsub_v1.types.ReceivedMessage`
instances.


Pulling a Subscription Asynchronously
-------------------------------------

The subscriber client uses the
:meth:`~.pubsub_v1.subscriber.client.Client.subscribe` method to start a
background thread to receive messages from Pub/Sub and calls a callback with
each message received.

.. code-block:: python
# As before, substitute {project} and {subscription} with appropriate
# values for your application.
future = subscriber.subscribe(
'projects/{project}/subscriptions/{subscription}',
callback
)
# Substitute PROJECT and SUBSCRIPTION with appropriate values for your
# application.
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
future = subscriber.subscribe(subscription_path, callback)
This will return a
:class:`~.pubsub_v1.subscriber.futures.StreamingPullFuture`. This future allows
Expand All @@ -71,8 +95,11 @@ Messages received from a subscription are processed asynchronously through
**callbacks**.

The basic idea: Define a function that takes one argument; this argument
will be a :class:`~.pubsub_v1.subscriber.message.Message` instance. This
function should do whatever processing is necessary. At the end, the
will be a :class:`~.pubsub_v1.subscriber.message.Message` instance, which is
a convenience wrapper around the :class:`~.pubsub_v1.types.PubsubMessage`
instance received from the server (and stored under the ``message`` attribute).

This function should do whatever processing is necessary. At the end, the
function should either :meth:`~.pubsub_v1.subscriber.message.Message.ack`
or :meth:`~.pubsub_v1.subscriber.message.Message.nack` the message.

Expand All @@ -87,7 +114,7 @@ Here is an example:
# Note that the callback is defined *before* the subscription is opened.
def callback(message):
do_something_with(message) # Replace this with your actual logic.
message.ack()
message.ack() # Asynchronously acknowledge the message.
# Open the subscription, passing the callback.
future = subscriber.subscribe(
Expand Down

0 comments on commit e1b1daa

Please sign in to comment.