Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Adding authorization and subscription rule support #17616

Merged
merged 18 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

### Features Added

- Exported log.Event constants for azservicebus. This will maek them easier to
discover and they are also documented. The text of log messages themselves
are not guaranteed to be stable. (#TBD)
- Exported log.Event constants for azservicebus. This will make them easier to
discover and they are also documented. NOTE: The log messages themselves
are not guaranteed to be stable. (#17596)
- `admin.Client` can now manage authorization rules and subscription filters and
actions. (#17616)

### Breaking Changes

Expand Down
20 changes: 20 additions & 0 deletions sdk/messaging/azservicebus/admin/admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package admin

import (
"context"
"errors"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -214,3 +215,22 @@ func (ep *entityPager[TFeed, T, TOutput]) Fetcher(ctx context.Context) ([]TOutpu

return finalItems, nil
}

// mapATOMError checks if the error is a legitimate 404 or a "fake" 404 (where the service succeeded but gave us back an
// empty feed instead). This "fake" behavior comes about because the API here is not truly a CRUD API (it's extremely close)
// so we have to do some small workarounds.
// NOTE: we had a debate about whether to return a nil instance or try to fabricate an HTTP 404 response instead (even if
// one didn't come back) and went with 'nil' to avoid having a fake HTTP response, which would have been confusing.
func mapATOMError[T any](err error) (*T, error) {
jhendrixMSFT marked this conversation as resolved.
Show resolved Hide resolved
if errors.Is(err, atom.ErrFeedEmpty) {
return nil, nil
}

var respError *azcore.ResponseError

if errors.As(err, &respError) && respError.StatusCode == http.StatusNotFound {
return nil, nil
}

return nil, err
}
90 changes: 90 additions & 0 deletions sdk/messaging/azservicebus/admin/admin_client_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

package admin

import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom"
)

// EntityStatus represents the current status of the entity.
type EntityStatus string

Expand All @@ -16,3 +22,87 @@ const (
// EntityStatusReceiveDisabled indicates that an entity cannot be used for receiving.
EntityStatusReceiveDisabled EntityStatus = "ReceiveDisabled"
)

type (
// AccessRight is an access right (Manage, Send, Listen) for an AuthorizationRule.
AccessRight string

// AuthorizationRule is a rule with keys and rights associated with an entity.
AuthorizationRule struct {
// AccessRights for this rule.
AccessRights []AccessRight

// KeyName for this rule.
KeyName *string

// CreatedTime for this rule.
CreatedTime *time.Time

// ModifiedTime for this rule.
ModifiedTime *time.Time

// PrimaryKey for this rule.
PrimaryKey *string

// SecondaryKey for this rule.
SecondaryKey *string
}
)

const (
// AccessRightManage allows changes to an entity.
AccessRightManage AccessRight = "Manage"
// AccessRightSend allows you to send messages to this entity.
AccessRightSend AccessRight = "Send"
// AccessRightListen allows you to receive messages from this entity.
AccessRightListen AccessRight = "Listen"
)

func internalAccessRightsToPublic(internalRules []atom.AuthorizationRule) []AuthorizationRule {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
var rules []AuthorizationRule

for _, rule := range internalRules {
var accessRights []AccessRight

for _, right := range rule.Rights {
accessRights = append(accessRights, AccessRight(right))
}

rules = append(rules, AuthorizationRule{
AccessRights: accessRights,
KeyName: rule.KeyName,
CreatedTime: rule.CreatedTime,
ModifiedTime: rule.ModifiedTime,
PrimaryKey: rule.PrimaryKey,
SecondaryKey: rule.SecondaryKey,
})
}

return rules
}

func publicAccessRightsToInternal(rules []AuthorizationRule) []atom.AuthorizationRule {
var internalRules []atom.AuthorizationRule

for _, rule := range rules {
var accessRights []string

for _, right := range rule.AccessRights {
accessRights = append(accessRights, string(right))
}

internalRules = append(internalRules, atom.AuthorizationRule{
Type: "SharedAccessAuthorizationRule",
ClaimType: "SharedAccessKey",
ClaimValue: "None",
Rights: accessRights,
KeyName: rule.KeyName,
CreatedTime: rule.CreatedTime,
ModifiedTime: rule.ModifiedTime,
PrimaryKey: rule.PrimaryKey,
SecondaryKey: rule.SecondaryKey,
})
}

return internalRules
}
26 changes: 11 additions & 15 deletions sdk/messaging/azservicebus/admin/admin_client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package admin

import (
"context"
"errors"
"net/http"
"time"

Expand Down Expand Up @@ -72,6 +71,9 @@ type QueueProperties struct {

// UserMetadata is custom metadata that user can associate with the queue.
UserMetadata *string

// AuthorizationRules are the authorization rules for this entity.
AuthorizationRules []AuthorizationRule
}

// QueueRuntimeProperties represent dynamic properties of a queue, such as the ActiveMessageCount.
Expand Down Expand Up @@ -178,11 +180,7 @@ func (ac *Client) GetQueue(ctx context.Context, queueName string, options *GetQu
_, err := ac.em.Get(ctx, "/"+queueName, &atomResp)

if err != nil {
if errors.Is(err, atom.ErrFeedEmpty) {
return nil, nil
}

return nil, err
return mapATOMError[GetQueueResponse](err)
}

queueItem, err := newQueueItem(atomResp)
Expand Down Expand Up @@ -213,11 +211,7 @@ func (ac *Client) GetQueueRuntimeProperties(ctx context.Context, queueName strin
_, err := ac.em.Get(ctx, "/"+queueName, &atomResp)

if err != nil {
if errors.Is(err, atom.ErrFeedEmpty) {
return nil, nil
}

return nil, err
return mapATOMError[GetQueueRuntimePropertiesResponse](err)
}

item, err := newQueueRuntimePropertiesItem(atomResp)
Expand Down Expand Up @@ -264,8 +258,8 @@ type QueueItem struct {
QueueProperties
}

// ListQueues lists queues.
func (ac *Client) ListQueues(options *ListQueuesOptions) *runtime.Pager[ListQueuesResponse] {
// NewListQueuesPager creates a pager that can be used to list queues.
func (ac *Client) NewListQueuesPager(options *ListQueuesOptions) *runtime.Pager[ListQueuesResponse] {
var pageSize int32

if options != nil {
Expand Down Expand Up @@ -314,8 +308,8 @@ type QueueRuntimePropertiesItem struct {
QueueRuntimeProperties
}

// ListQueuesRuntimeProperties lists runtime properties for queues.
func (ac *Client) ListQueuesRuntimeProperties(options *ListQueuesRuntimePropertiesOptions) *runtime.Pager[ListQueuesRuntimePropertiesResponse] {
// NewListQueuesRuntimePropertiesPager creates a pager that lists the runtime properties for queues.
func (ac *Client) NewListQueuesRuntimePropertiesPager(options *ListQueuesRuntimePropertiesOptions) *runtime.Pager[ListQueuesRuntimePropertiesResponse] {
var pageSize int32

if options != nil {
Expand Down Expand Up @@ -397,6 +391,7 @@ func newQueueEnvelope(props *QueueProperties, tokenProvider auth.TokenProvider)
ForwardTo: props.ForwardTo,
ForwardDeadLetteredMessagesTo: props.ForwardDeadLetteredMessagesTo,
UserMetadata: props.UserMetadata,
AuthorizationRules: publicAccessRightsToInternal(props.AuthorizationRules),
}

return atom.WrapWithQueueEnvelope(qpr, tokenProvider)
Expand All @@ -421,6 +416,7 @@ func newQueueItem(env *atom.QueueEnvelope) (*QueueItem, error) {
ForwardTo: desc.ForwardTo,
ForwardDeadLetteredMessagesTo: desc.ForwardDeadLetteredMessagesTo,
UserMetadata: desc.UserMetadata,
AuthorizationRules: internalAccessRightsToPublic(desc.AuthorizationRules),
}

return &QueueItem{
Expand Down
Loading