Skip to content

Latest commit

 

History

History
333 lines (246 loc) · 14.4 KB

File metadata and controls

333 lines (246 loc) · 14.4 KB

Azure Service Bus client module for Go

Azure Service Bus is a highly reliable cloud messaging service for providing real-time and fault-tolerant communication between distributed senders and receivers.

Use the Service Bus client module github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus in your application to:

  • Send messages to a Service Bus queue or topic
  • Receive messages from a Service Bus queue or subscription

Key links: Source code | API reference documentation | REST API documentation | Product documentation | Samples

If you used the pre-release azure-service-bus-go module, see the Migration guide.

Getting started

Prerequisites

  • Go, version 1.18 or higher - Install Go
  • Azure subscription - Create a free account
  • Service Bus namespace - Create a namespace
  • A Service Bus queue, topic, or subscription - See the Azure Service Bus documentation to create an entity in your Service Bus namespace. For example, create a Service Bus queue using the Azure portal, the Azure CLI, or other tools.

Install the package

Install the Azure Service Bus client module for Go with go get:

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus

Authenticate the client

The Service Bus Client can be created using a credential from the Azure Identity package, such as DefaultAzureCredential, or using a Service Bus connection string.

Using a service principal

import (
  "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
  // For more information about the DefaultAzureCredential, see:
  // https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#NewDefaultAzureCredential
  credential, err := azidentity.NewDefaultAzureCredential(nil)

  if err != nil {
    panic(err)
  }

  // The service principal specified by the credential needs to be added to the appropriate Service Bus roles for your
  // resource. For more information about Service Bus roles, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/service-bus-managed-service-identity#azure-built-in-roles-for-azure-service-bus
  client, err := azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil)

  if err != nil {
    panic(err)
  }
}

Using a connection string

import (
  "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
  // For instructions on how to get a Service Bus connection string, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/service-bus-quickstart-portal#get-the-connection-string
  client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)

  if err != nil {
    panic(err)
  }
}

Key concepts

Once you've created a Client, you can interact with resources within a Service Bus namespace:

  • Queue: Allows for sending and receiving messages. Often used for point-to-point communication.
  • Topic: Similar to a queue but splits the receiving and sending into separate entities. Messages are sent to a topic and are broadcast to subscriptions, where they can be consumed independently, and in parallel by receivers.
  • Subscription: Consumes messages from a topic. Each subscription is independent, and receives a copy of each message sent to the topic. Rules and filters can be used to tailor which messages are received by a specific subscription.

For more information about these resources, see What is Azure Service Bus?.

Using a Client you can do the following:

The queues, topics, and subscriptions should be created prior to using this library.

Examples

The following examples cover common tasks using Azure Service Bus:

Send messages

Once you've created a Client you can create a Sender, which will allow you to send messages.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

Send a single message:

err := sender.SendMessage(context.TODO(), &azservicebus.Message{
  Body: []byte("hello world!"),
}, nil)

Send multiple messages using a batch

You can also send messages in batches, which can be more efficient than sending them individually.

// Create a message batch. It will automatically be sized for the Service Bus
// namespace's maximum message size.
currentMessageBatch, err := sender.NewMessageBatch(context.TODO(), nil)

if err != nil {
  panic(err)
}

messagesToSend := []*azservicebus.Message{
  // any messages that you'd want to send would be here, or sourced from
  // somewhere else.
}

for i := 0; i < len(messagesToSend); i++ {
  // Add a message to our message batch. This can be called multiple times.
  err = currentMessageBatch.AddMessage(messagesToSend[i], nil)

  if errors.Is(err, azservicebus.ErrMessageTooLarge) {
    if currentMessageBatch.NumMessages() == 0 {
      // This means the message itself is too large to be sent, even on its own.
      // This will require intervention from the user.
      panic("Single message is too large to be sent in a batch.")
    }

    fmt.Printf("Message batch is full. Sending it and creating a new one.\n")

    // send what we have since the batch is full
    err := sender.SendMessageBatch(context.TODO(), currentMessageBatch, nil)

    if err != nil {
      panic(err)
    }

    // Create a new batch and retry adding this message to our batch.
    newBatch, err := sender.NewMessageBatch(context.TODO(), nil)

    if err != nil {
      panic(err)
    }

    currentMessageBatch = newBatch

    // rewind the counter and attempt to add the message again (this batch
    // was full so it didn't go out with the previous SendMessageBatch call).
    i--
  } else if err != nil {
    panic(err)
  }
}

// check if any messages are remaining to be sent.
if currentMessageBatch.NumMessages() > 0 {
  err := sender.SendMessageBatch(context.TODO(), currentMessageBatch, nil)

  if err != nil {
    panic(err)
  }
}

Receive messages

Once you've created a Client you can create a Receiver, which will allow you to receive messages.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

receiver, err := client.NewReceiverForQueue(
  "<queue>",
  nil,
)
// or
// client.NewReceiverForSubscription("<topic>", "<subscription>")

// ReceiveMessages respects the passed in context, and will gracefully stop
// receiving when 'ctx' is cancelled.
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

messages, err := receiver.ReceiveMessages(ctx,
  // The number of messages to receive. Note this is merely an upper
  // bound. It is possible to get fewer message (or zero), depending
  // on the contents of the remote queue or subscription and network
  // conditions.
  1,
  nil,
)

if err != nil {
  panic(err)
}

for _, message := range messages {
  // The message body is a []byte. For this example we're assuming that the body
  // is a string, converted to bytes, but any []byte payload is valid.
  var body []byte = message.Body
  fmt.Printf("Message received with body: %s\n", string(body))

  // For more information about settling messages, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
  err = receiver.CompleteMessage(context.TODO(), message, nil)

  if err != nil {
    var sbErr *azservicebus.Error

    if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
      // The message lock has expired. This isn't fatal for the client, but it does mean
      // that this message can be received by another Receiver (or potentially this one).
      fmt.Printf("Message lock expired\n")

      // You can extend the message lock by calling Receiver.RenewMessageLock(msg) before the
      // message lock has expired.
      continue
    }

    panic(err)
	}

  fmt.Printf("Received and completed the message\n")
}

Dead letter queue

The dead letter queue is a sub-queue. Each queue or subscription has its own dead letter queue. Dead letter queues store messages that have been explicitly dead lettered using the Receiver.DeadLetterMessage function.

Opening a dead letter queue is a configuration option when creating a Receiver.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

deadLetterReceiver, err := client.NewReceiverForQueue("<queue>",
  &azservicebus.ReceiverOptions{
    SubQueue: azservicebus.SubQueueDeadLetter,
  })
// or 
// client.NewReceiverForSubscription("<topic>", "<subscription>", 
//   &azservicebus.ReceiverOptions{
//     SubQueue: azservicebus.SubQueueDeadLetter,
//   })

To see some example code for receiving messages using the Receiver, see the Receive messages example.

Troubleshooting

Logging

This module uses the classification-based logging implementation in azcore. To enable console logging for all SDK modules, set the environment variable AZURE_SDK_GO_LOGGING to all.

Use the azcore/log package to control log event output or to enable logs for azservicebus only. For example:

import (
  "fmt"
  azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
)

// print log output to stdout
azlog.SetListener(func(event azlog.Event, s string) {
    fmt.Printf("[%s] %s\n", event, s)
})

// pick the set of events to log
azlog.SetEvents(
  // EventConn is used whenever we create a connection or any links (that is, receivers or senders).
  azservicebus.EventConn,
  // EventAuth is used when we're doing authentication/claims negotiation.
  azservicebus.EventAuth,
  // EventReceiver represents operations that happen on receivers.
  azservicebus.EventReceiver,
  // EventSender represents operations that happen on senders.
  azservicebus.EventSender,
  // EventAdmin is used for operations in the azservicebus/admin.Client
  azservicebus.EventAdmin,
)

Next steps

See the examples for using this library to send and receive messages to or from Service Bus queues, topics, and subscriptions.

Contributing

If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.

Impressions