Skip to content

Commit

Permalink
docs: Update sample code snippets in the main readme (#71)
Browse files Browse the repository at this point in the history
* Update example code in readme

* comment
  • Loading branch information
tmdiep authored May 21, 2020
1 parent e364772 commit 603b4a7
Showing 1 changed file with 75 additions and 43 deletions.
118 changes: 75 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ If you are using Maven, add this to your pom.xml file
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.1.3</version>
</dependency>

<!-- A logging dependency used by the underlying library -->
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>flogger-system-backend</artifactId>
<version>0.5.1</version>
<scope>runtime</scope>
</dependency>
```
If you are using Gradle, add this to your dependencies
```Groovy
Expand Down Expand Up @@ -85,6 +93,9 @@ publishers. Add the following imports at the top of your file:

```java
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.*;
import com.google.protobuf.util.Durations;
```
Then, to create the topic, use the following code:

Expand All @@ -93,28 +104,28 @@ CloudRegion region = CloudRegion.of("us-central1");
TopicPath topicPath =
TopicPaths.newBuilder()
.setZone(CloudZone.of(region, 'b'))
.setProjectNumber(ProjectNumber.of(123))
.setProjectNumber(ProjectNumber.of(123L)) // Your project number.
.setTopicName(TopicName.of("my-new-topic"))
.build();

Topic topic =
Topic.newBuilder()
.setPartitionConfig(
PartitionConfig.newBuilder()
.setScale(1) // Set publishing throughput to 1*4 MiB per sec. This must be 1-4.
.setCount(PARTITIONS))
.setCount(2)) // The number of partitions.
.setRetentionConfig(
RetentionConfig.newBuilder()
.setPeriod(Durations.fromDays(7))
.setPerPartitionBytes(100000000000L)) // 100 GiB. This must be 30 GiB-10 TiB.
.setName(topicPath.value())
.build();

try (AdminClient client =
AdminClientBuilder.builder()
.setRegion(region)
.build()) {
client.createTopic(topic).get();
AdminClientSettings settings =
AdminClientSettings.newBuilder().setRegion(region).build();
try (AdminClient client = AdminClient.create(settings)) {
Topic response = client.createTopic(topic).get();
System.out.println(response.getAllFields());
}
```

Expand All @@ -123,8 +134,12 @@ try (AdminClient client =
With Pub/Sub Lite, you can publish messages to a topic. Add the following import at the top of your file:

```java
import com.google.api.core.*;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.cloudpubsub.*;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.*;
```
Then, to publish messages asynchronously, use the following code:

Expand All @@ -139,12 +154,12 @@ public class PublisherExample {
// Load the topic name from a commandline flag.
private static final String TOPIC_NAME = "my-new-topic";

public static List<ApiFuture<String>> runPublisher(PublisherInterface publisher) throws Exception {
public static List<ApiFuture<String>> runPublisher(Publisher publisher) throws Exception {
List<ApiFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message-" + i;

// convert message to bytes
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

Expand All @@ -157,25 +172,27 @@ public class PublisherExample {

// Publish messages to a topic.
public static void main(String[] args) throws Exception {
PublisherApiService publisherService =
PublisherBuilder.newBuilder()
PublisherSettings settings =
PublisherSettings.newBuilder()
.setTopicPath(
TopicPaths.newBuilder()
.setProjectNumber(ProjectNumber.of(PROJECT_NUMBER))
.setZone(CloudZone.parse(ZONE))
.setTopicName(TopicName.of(TOPIC_NAME))
.build())
.build();
publisherService.startAsync().awaitRunning();
List<ApiFuture<String>> futureAckIds = runPublisher(publisherService);
publisherService.stopAsync().awaitTerminated();
Publisher publisher = Publisher.create(settings);
publisher.startAsync().awaitRunning();
List<ApiFuture<String>> futureAckIds = runPublisher(publisher);
publisher.stopAsync().awaitTerminated();

List<String> ackIds = ApiFutures.allAsList(futureAckIds).get();
ArrayList<PublishMetadata> metadata = new ArrayList<>();
for (String id : ackIds) {
metadata.add(PublishMetadata.decode(id));
}
for (PublishMetadata one : metadata) {
System.out.println(one.offset());
System.out.println(one);
}
}
}
Expand All @@ -188,15 +205,17 @@ single, specific topic. Add the following imports at the top of your file:

```java
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Subscription.*;
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.*;
```
Then, to create the subscription, use the following code:

```java
// CloudZone must be equivalent to the topic.
CloudRegion cloudRegion = CloudRegion.of("us-central1");
CloudZone zone = CloudZone.of(cloudRegion, 'b');
// ProjectNumber must the equivalent to the topic.
ProjectNumber projectNum = ProjectNumber.of(123);
// CloudZone must be the zone of the topic.
CloudRegion region = CloudRegion.of("us-central1");
CloudZone zone = CloudZone.of(region, 'b');
ProjectNumber projectNum = ProjectNumber.of(123L);
TopicName topicName = TopicName.of("my-new-topic");
SubscriptionName subscriptionName = SubscriptionName.of("my-new-sub");

Expand All @@ -223,11 +242,11 @@ Subscription.newBuilder()
.setTopic(topicPath.value())
.build();

try (AdminClient client =
AdminClientBuilder.builder()
.setRegion(region)
.build()) {
client.createSubscription(subscription).get();
AdminClientSettings settings =
AdminClientSettings.newBuilder().setRegion(region).build();
try (AdminClient client = AdminClient.create(settings)) {
Subscription response = client.createSubscription(subscription).get();
System.out.println(response.getAllFields());
}
```

Expand All @@ -237,29 +256,37 @@ With Pub/Sub Lite you can receive messages from a subscription. Add the
following imports at the top of your file:

```java
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.cloudpubsub.*;
import com.google.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.SubscriberInterface;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import java.util.*;
```
Then, to pull messages asynchronously, use the following code:

```java
CloudZone zone = CloudZone.parse("us-central1-b");
ProjectNumber projectNum = ProjectNumber.of(123L);
SubscriptionName subscriptionName = SubscriptionName.of("my-new-sub");

SubscriptionPath subscriptionPath =
SubscriptionPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setSubscriptionName(subscriptionName)
.build();
// Connect to partition 0.
Partition PARTITION = Partition.of(0);
SubscriptionPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setSubscriptionName(subscriptionName)
.build();

FlowControlSettings flowControlSettings =
FlowControlSettings.builder()
.setBytesOutstanding(10_000_000) // 10 MB per partition.
.setMessagesOutstanding(Long.MAX_VALUE)
.build();
.setBytesOutstanding(10 * 1024 * 1024L) // 10 MiB per partition.
.setMessagesOutstanding(Long.MAX_VALUE)
.build();

// Connect to partitions 0, 1. Note that we configured the topic with 2
// partitions.
List<Partition> partitions = Arrays.asList(Partition.of(0), Partition.of(1));

MessageReceiver receiver =
new MessageReceiver() {
Expand All @@ -276,19 +303,20 @@ MessageReceiver receiver =
}
};

SubscriberInterface subscriber = null;
Subscriber subscriber = null;
try {
subscriber = Subscriber.create(SubscriberSetttings.newBuilder()
subscriber = Subscriber.create(SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setFlowControlSettings(flowControlSettings)
.setPerPartitionFlowControlSettings(flowControlSettings)
.setReceiver(receiver)
.setPartitions(List.of(Partition.of(0)))
.setPartitions(partitions)
.build());
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
// Handle failure. This is called when the Subscriber encountered a
// fatal error and is shutting down.
System.err.println(failure);
}
},
Expand All @@ -302,6 +330,10 @@ try {
}
```

## Samples

More sample code can be found in the [samples](samples) folder.

## Troubleshooting

To get help, follow the instructions in the [shared Troubleshooting document][troubleshooting].
Expand Down

0 comments on commit 603b4a7

Please sign in to comment.