Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] AdminClient can now create topics and subscriptions #16044

Merged
merged 4 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
enough to fit into a single batch.
- Receiving from sessions using a SessionReceiver, created using Client.AcceptSessionFor(Queue|Subscription)
or Client.AcceptNextSessionFor(Queue|Subscription).
- Can fully create, update, delete and list queues (and queue runtime properties) using the `AdministrationClient`.
- Can now renew a message lock for a ReceivedMessage using Receiver.RenewMessageLock()
- Can now renew a session lock for a SessionReceiver using SessionReceiver.RenewSessionLock()
- Can fully create, update, delete and list queues, topics and subscriptions using the `AdministrationClient`.
- Can renew message and session locks, using Receiver.RenewMessageLock() and SessionReceiver.RenewSessionLock(), respectively.

### Breaking Changes

Expand Down
175 changes: 0 additions & 175 deletions sdk/messaging/azservicebus/admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
package azservicebus

import (
"context"
"errors"
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom"
Expand Down Expand Up @@ -46,175 +42,4 @@ func NewAdminClient(fullyQualifiedNamespace string, tokenCredential azcore.Token
return &AdminClient{em: em}, nil
}

// AddQueue creates a queue using defaults for all options.
func (ac *AdminClient) AddQueue(ctx context.Context, queueName string) (*QueueProperties, error) {
return ac.AddQueueWithProperties(ctx, &QueueProperties{
Name: queueName,
})
}

// CreateQueue creates a queue with configurable properties.
func (ac *AdminClient) AddQueueWithProperties(ctx context.Context, properties *QueueProperties) (*QueueProperties, error) {
return ac.createOrUpdateQueueImpl(ctx, properties, true)
}

// GetQueue gets a queue by name.
func (ac *AdminClient) GetQueue(ctx context.Context, queueName string) (*QueueProperties, error) {
name, desc, err := ac.getQueueImpl(ctx, queueName)

if err != nil {
return nil, err
}

return newQueueProperties(name, desc)
}

// GetQueueRuntimeProperties gets runtime properties of a queue, like the SizeInBytes, or ActiveMessageCount.
func (ac *AdminClient) GetQueueRuntimeProperties(ctx context.Context, queueName string) (*QueueRuntimeProperties, error) {
name, desc, err := ac.getQueueImpl(ctx, queueName)

if err != nil {
return nil, err
}

return newQueueRuntimeProperties(name, desc), nil
}

// QueueExists checks if a queue exists.
// Returns true if the queue is found
// (false, nil) if the queue is not found
// (false, err) if an error occurred while trying to check if the queue exists.
func (ac *AdminClient) QueueExists(ctx context.Context, queueName string) (bool, error) {
_, err := ac.GetQueue(ctx, queueName)

if err == nil {
return true, nil
}

var httpResponse azcore.HTTPResponse

if errors.As(err, &httpResponse) && httpResponse.RawResponse().StatusCode == 404 {
return false, nil
}

return false, err
}

// UpdateQueue updates an existing queue.
func (ac *AdminClient) UpdateQueue(ctx context.Context, properties *QueueProperties) (*QueueProperties, error) {
return ac.createOrUpdateQueueImpl(ctx, properties, false)
}

// DeleteQueue deletes a queue.
func (ac *AdminClient) DeleteQueue(ctx context.Context, queueName string) (*http.Response, error) {
resp, err := ac.em.Delete(ctx, "/"+queueName)

if err != nil {
return nil, err
}

return resp, nil
}

// ListQueuesOptions can be used to configure the ListQueues method.
type ListQueuesOptions struct {
// Top is the maximum size of each page of results.
Top int
// Skip is the starting index for the paging operation.
Skip int
}

// QueuePropertiesPager provides iteration over ListQueueProperties pages.
type QueuePropertiesPager interface {
// NextPage returns true if the pager advanced to the next page.
// Returns false if there are no more pages or an error occurred.
NextPage(context.Context) bool

// PageResponse returns the current QueueProperties.
PageResponse() []*QueueProperties

// Err returns the last error encountered while paging.
Err() error
}

// ListQueues lists queues.
func (ac *AdminClient) ListQueues(options *ListQueuesOptions) QueuePropertiesPager {
var pageSize int
var skip int

if options != nil {
skip = options.Skip
pageSize = options.Top
}

return &queuePropertiesPager{
adminClient: ac,
pageSize: pageSize,
skip: skip,
}
}

// ListQueuesRuntimePropertiesOptions can be used to configure the ListQueuesRuntimeProperties method.
type ListQueuesRuntimePropertiesOptions struct {
// Top is the maximum size of each page of results.
Top int
// Skip is the starting index for the paging operation.
Skip int
}

// QueueRuntimePropertiesPager provides iteration over ListQueueRuntimeProperties pages.
type QueueRuntimePropertiesPager interface {
// NextPage returns true if the pager advanced to the next page.
// Returns false if there are no more pages or an error occurred.
NextPage(context.Context) bool

// PageResponse returns the current QueueRuntimeProperties.
PageResponse() []*QueueRuntimeProperties

// Err returns the last error encountered while paging.
Err() error
}

// ListQueuesRuntimeProperties lists runtime properties for queues.
func (ac *AdminClient) ListQueuesRuntimeProperties(options *ListQueuesRuntimePropertiesOptions) QueueRuntimePropertiesPager {
var pageSize int
var skip int

if options != nil {
skip = options.Skip
pageSize = options.Top
}

return &queueRuntimePropertiesPager{
adminClient: ac,
pageSize: pageSize,
skip: skip,
}
}

// func (ac *AdminClient) GetNamespaceProperties() {}

// func (ac *AdminClient) CreateTopic() {}
// func (ac *AdminClient) CreateSubscription() {}
// func (ac *AdminClient) CreateRule() {}
// func (ac *AdminClient) DeleteTopic() {}
// func (ac *AdminClient) DeleteSubscription() {}
// func (ac *AdminClient) DeleteRule() {}
// func (ac *AdminClient) GetRule() {}
// func (ac *AdminClient) GetSubscription() {}
// func (ac *AdminClient) GetSubscriptionRuntimeProperties() {}
// func (ac *AdminClient) GetTopic() {}
// func (ac *AdminClient) GetTopicRuntimeProperties() {}
// func (ac *AdminClient) ListRules() {}
// func (ac *AdminClient) ListTopics() {}
// func (ac *AdminClient) ListTopicsRuntimeProperties() {}
// func (ac *AdminClient) ListSubscriptions() {}
// func (ac *AdminClient) ListSubscriptionsRuntimeProperties() {}

// func (ac *AdminClient) TopicExists() {}
// func (ac *AdminClient) SubscriptionExists() {}
// func (ac *AdminClient) RuleExists() {}

// func (ac *AdminClient) UpdateTopic() {}
// func (ac *AdminClient) UpdateSubscription() {}
// func (ac *AdminClient) UpdateRule() {}
97 changes: 0 additions & 97 deletions sdk/messaging/azservicebus/admin_client_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,103 +7,6 @@ import (
"time"
)

// QueueProperties represents the static properties of the queue.
type QueueProperties struct {
// Name of the queue relative to the namespace base address.
Name string

// LockDuration - The duration a message is locked when using the PeekLock receive mode.
// Default is 1 minute.
LockDuration *time.Duration

// MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory
// allocated for the queue.
// Default is 1024.
MaxSizeInMegabytes *int32

// RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
RequiresDuplicateDetection *bool

// RequiresSession indicates whether the queue supports the concept of sessions.
// Sessionful-messages follow FIFO ordering.
// Default is false.
RequiresSession *bool

// DefaultMessageTimeToLive is the duration after which the message expires, starting from when
// the message is sent to Service Bus. This is the default value used when TimeToLive is not
// set on a message itself.
DefaultMessageTimeToLive *time.Duration

// DeadLetteringOnMessageExpiration indicates whether this queue has dead letter
// support when a message expires.
DeadLetteringOnMessageExpiration *bool

// DuplicateDetectionHistoryTimeWindow is the duration of duplicate detection history.
// Default value is 10 minutes.
DuplicateDetectionHistoryTimeWindow *time.Duration

// MaxDeliveryCount is the maximum amount of times a message can be delivered before it is automatically
// sent to the dead letter queue.
// Default value is 10.
MaxDeliveryCount *int32

// EnableBatchedOperations indicates whether server-side batched operations are enabled.
EnableBatchedOperations *bool

// The current status of the queue.
Status *EntityStatus

// AutoDeleteOnIdle is the idle interval after which the queue is automatically deleted.
AutoDeleteOnIdle *time.Duration

// Indicates whether the queue is to be partitioned across multiple message brokers.
EnablePartitioning *bool

// ForwardTo is the name of the recipient entity to which all the messages sent to the queue
// are forwarded to.
ForwardTo *string

// ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
ForwardDeadLetteredMessagesTo *string
}

// QueueRuntimeProperties represent dynamic properties of a queue, such as the ActiveMessageCount.
type QueueRuntimeProperties struct {
// Name is the name of the queue.
Name string

// SizeInBytes - The size of the queue, in bytes.
SizeInBytes int64

// CreatedAt is when the entity was created.
CreatedAt time.Time

// UpdatedAt is when the entity was last updated.
UpdatedAt time.Time

// AccessedAt is when the entity was last updated.
AccessedAt time.Time

// TotalMessageCount is the number of messages in the queue.
TotalMessageCount int64

// ActiveMessageCount is the number of active messages in the entity.
ActiveMessageCount int32

// DeadLetterMessageCount is the number of dead-lettered messages in the entity.
DeadLetterMessageCount int32

// ScheduledMessageCount is the number of messages that are scheduled to be enqueued.
ScheduledMessageCount int32

// TransferDeadLetterMessageCount is the number of messages transfer-messages which are dead-lettered
// into transfer-dead-letter subqueue.
TransferDeadLetterMessageCount int32

// TransferMessageCount is the number of messages which are yet to be transferred/forwarded to destination entity.
TransferMessageCount int32
}

// EntityStatus represents the current status of the entity.
type EntityStatus string

Expand Down
Loading