Skip to content

Commit

Permalink
[azservicebus] Cleanup/changes from API review (removal of processor …
Browse files Browse the repository at this point in the history
…and singular receives) (Azure#15901)

API trimming/modifications based on review:

* Unexport Processor
* Unexport the singular message APIs (ReceiveMessage, ReceiveDeferredMessage) in favor of their plural counterparts.
* Deleting processor sample but would have anyways since we had an example file for it.
* Remove mentions of the Processor from our readme/migrationguide
* Remove mentions of the Processor in doc comments on exported types
  • Loading branch information
richardpark-msft authored and jhendrixMSFT committed Jan 12, 2022
1 parent 2c74fc8 commit d3eec8f
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 481 deletions.
88 changes: 5 additions & 83 deletions sdk/messaging/azservicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,72 +157,8 @@ if !added {

### Receive messages

Once you've created a [Client][godoc_client] you can create a [Processor][godoc_processor], which will allow you to receive messages.

The [Processor][godoc_processor] handles error recovery internally, making it a good fit for
applications where the intention is to stream and process events for an extended period
of time.

> NOTE: Creating a `client` is covered in the ["Authenticate the client"](#authenticate-the-client) section of the readme.
```go
processor, err := client.NewProcessorForQueue(
"<queue>",
&azservicebus.ProcessorOptions{
// NOTE: this is a parameter you'll want to tune. It controls the number of
// active message `handleMessage` calls that the processor will allow at any time.
MaxConcurrentCalls: 1,
ReceiveMode: azservicebus.PeekLock,
ManualComplete: false,
},
)
// or
// client.NewProcessorForSubscription("<topic>", "<subscription>")

if err != nil {
log.Fatalf("Failed to create the processor: %s", err.Error())
}

handleMessage := func(message *azservicebus.ReceivedMessage) error {
// This is where your logic for handling messages goes
yourLogicForProcessing(message)
return nil
}

handleError := func(err error) {
// handleError will be called on errors that are noteworthy
// but the Processor internally will continue to attempt to
// recover.

// NOTE: errors returned from `handleMessage` above will also be
// sent here, but do not affect the running of the Processor
// itself.

// We'll just print these out, as they're informational and
// can indicate if there are longer lived problems that we might
// want to resolve manually (for instance, longer term network
// outages, or issues affecting your `handleMessage` handler)
log.Printf("Error: %s", err.Error())
}

err := processor.Start(context.TODO(), handleMessage, handleError)

if err != nil {
log.Printf("Processor loop has exited: %s", err.Error())
}

err := processor.Close(context.TODO())

if err != nil {
log.Printf("Processor failed to close: %s", err.Error())
}
```

Once you've created a [Client][godoc_client] you can create a [Receiver][godoc_receiver], which will allow you to receive messages.

The [Receiver][godoc_receiver] is a good fit for applications that want to receive messages in fixed increments, rather than
continually streaming messages, as the [Processor][godoc_processor] does.

> NOTE: Creating a `client` is covered in the ["Authenticate the client"](#authenticate-the-client) section of the readme.
```go
Expand Down Expand Up @@ -256,7 +192,7 @@ messages, err := receiver.ReceiveMessages(context.TODO(),
)

if err != nil {
log.Fatalf("Failed to get messages: %s", err.Error())
panic(err)
}

for _, message := range messages {
Expand All @@ -266,33 +202,21 @@ for _, message := range messages {
// For more information about settling messages:
// https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
if err := receiver.CompleteMessage(message); err != nil {
log.Printf("Error completing message: %s", err.Error())
panic(err)
}
}
```

### Dead letter queue

The dead letter queue is a **sub-queue**. Each queue or subscription has its own dead letter queue. Dead letter queues store
messages that have been explicitly dead lettered via the [Processor.DeadLetterMessage][godoc_processor_deadlettermessage]
or [Receiver.DeadLetterMessage][godoc_receiver_deadlettermessage] functions.
messages that have been explicitly dead lettered using the [Receiver.DeadLetterMessage][godoc_receiver_deadlettermessage] function.

Opening a dead letter queue is just a configuration option when creating a [Processor][godoc_processor] or [Receiver][godoc_receiver].
Opening a dead letter queue is just a configuration option when creating a [Receiver][godoc_receiver].

> NOTE: Creating a `client` is covered in the ["Authenticate the client"](#authenticate-the-client) section of the readme.
```go

deadLetterReceiver, err := client.NewProcessorForQueue("<queue>",
&azservicebus.ProcessorOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
})
// or
// client.NewProcessorForSubscription("<topic>", "<subscription>",
// &azservicebus.ProcessorOptions{
// SubQueue: azservicebus.SubQueueDeadLetter,
// })

deadLetterReceiver, err := client.NewReceiverForQueue("<queue>",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
Expand All @@ -304,7 +228,7 @@ deadLetterReceiver, err := client.NewReceiverForQueue("<queue>",
// })
```

To see some example code for receiving messages using the Processor or Receiver see the ["Receive messages"](#receive-messages) sample.
To see some example code for receiving messages using the Receiver see the ["Receive messages"](#receive-messages) sample.

## Next steps

Expand Down Expand Up @@ -332,8 +256,6 @@ If you'd like to contribute to this library, please read the [contributing guide
[godoc_receiver]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/#Receiver
[godoc_receiver_completemessage]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/#Receiver.CompleteMessage
[godoc_receiver_deadlettermessage]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/#Receiver.DeadLetterMessage
[godoc_processor]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/#Processor
[godoc_processor_deadlettermessage]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/#Processor.DeadLetterMessage
[godoc_newsender]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/#Client.NewSender
[godoc_newreceiver_queue]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/#Client.NewReceiverForQueue
[godoc_newreceiver_subscription]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/#Client.NewReceiverForSubscription
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestAdminClient_Queue_Forwarding(t *testing.T) {
receiver, err := client.NewReceiverForQueue(forwardToQueueName, nil)
require.NoError(t, err)

forwardedMessage, err := receiver.ReceiveMessage(context.Background(), nil)
forwardedMessage, err := receiver.receiveMessage(context.Background(), nil)
require.NoError(t, err)

require.EqualValues(t, "this message will be auto-forwarded", string(forwardedMessage.Body))
Expand Down
32 changes: 2 additions & 30 deletions sdk/messaging/azservicebus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/devigned/tab"
)

// Client provides methods to create Sender, Receiver and Processor
// Client provides methods to create Sender and Receiver
// instances to send and receive messages from Service Bus.
type Client struct {
config clientConfig
Expand Down Expand Up @@ -120,34 +120,6 @@ func newClientImpl(config clientConfig, options *ClientOptions) (*Client, error)
return client, err
}

// NewProcessor creates a Processor for a queue.
func (client *Client) NewProcessorForQueue(queue string, options *ProcessorOptions) (*Processor, error) {
id, cleanupOnClose := client.getCleanupForCloseable()

processor, err := newProcessor(client.namespace, &entity{Queue: queue}, cleanupOnClose, options)

if err != nil {
return nil, err
}

client.addCloseable(id, processor)
return processor, nil
}

// NewProcessor creates a Processor for a subscription.
func (client *Client) NewProcessorForSubscription(topic string, subscription string, options *ProcessorOptions) (*Processor, error) {
id, cleanupOnClose := client.getCleanupForCloseable()

processor, err := newProcessor(client.namespace, &entity{Topic: topic, Subscription: subscription}, cleanupOnClose, options)

if err != nil {
return nil, err
}

client.addCloseable(id, processor)
return processor, nil
}

// NewReceiver creates a Receiver for a queue. A receiver allows you to receive messages.
func (client *Client) NewReceiverForQueue(queue string, options *ReceiverOptions) (*Receiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
Expand Down Expand Up @@ -279,7 +251,7 @@ func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topi
return sessionReceiver, nil
}

// Close closes the current connection Service Bus as well as any Sender, Receiver or Processors created
// Close closes the current connection Service Bus as well as any Senders or Receivers created
// using this client.
func (client *Client) Close(ctx context.Context) error {
var lastError error
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestNewClientUnitTests(t *testing.T) {
require.EqualValues(t, 1, ns.AMQPLinks.Closed)

client, ns = setupClient()
_, err = client.NewProcessorForQueue("hello", nil)
_, err = newProcessorForQueue(client, "hello", nil)

require.NoError(t, err)
require.EqualValues(t, 1, len(client.links))
Expand All @@ -149,7 +149,7 @@ func TestNewClientUnitTests(t *testing.T) {
require.EqualValues(t, 1, ns.AMQPLinks.Closed)

client, ns = setupClient()
_, err = client.NewProcessorForSubscription("hello", "world", nil)
_, err = newProcessorForSubscription(client, "hello", "world", nil)

require.NoError(t, err)
require.EqualValues(t, 1, len(client.links))
Expand Down
81 changes: 0 additions & 81 deletions sdk/messaging/azservicebus/example_processor_test.go

This file was deleted.

10 changes: 6 additions & 4 deletions sdk/messaging/azservicebus/example_session_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ func ExampleClient_AcceptSessionForQueue() {
exitOnError("Failed to create session receiver", err)

// session receivers function the same as any other receiver
message, err := sessionReceiver.ReceiveMessage(context.TODO(), nil)
messages, err := sessionReceiver.ReceiveMessages(context.TODO(), 5, nil)
exitOnError("Failed to receive a message", err)

err = sessionReceiver.CompleteMessage(context.TODO(), message)
exitOnError("Failed to complete message", err)
for _, message := range messages {
err = sessionReceiver.CompleteMessage(context.TODO(), message)
exitOnError("Failed to complete message", err)

fmt.Printf("Received message from session ID \"%s\" and completed it", *message.SessionID)
fmt.Printf("Received message from session ID \"%s\" and completed it", *message.SessionID)
}
}

func ExampleClient_AcceptNextSessionForQueue() {
Expand Down
4 changes: 0 additions & 4 deletions sdk/messaging/azservicebus/example_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ func exitOnError(message string, err error) {
log.Panicf("(error in example): %s: %s", message, err.Error())
}

func yourLogicForProcessing(message *azservicebus.ReceivedMessage) {
log.Printf("Message received")
}

// these just make it so our examples don't have to have a bunch of extra declarations
// for unrelated entities.
var connectionString string
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type ReceiveMode int

const (
// PeekLock will lock messages as they are received and can be settled
// using the Receiver or Processor's (Complete|Abandon|DeadLetter|Defer)Message
// using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message
// functions.
PeekLock ReceiveMode = 0
// ReceiveAndDelete will delete messages as they are received.
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type (
// ReceivedMessage is a received message from a Client.NewReceiver() or Client.NewProcessor().
// ReceivedMessage is a received message from a Client.NewReceiver().
ReceivedMessage struct {
Message

Expand Down
Loading

0 comments on commit d3eec8f

Please sign in to comment.